From cb0f9841553f6e6139a361f290bc6aba0dac6c8e Mon Sep 17 00:00:00 2001
From: liliu-z <105927039+liliu-z@users.noreply.github.com>
Date: Tue, 8 Apr 2025 17:36:30 +0800
Subject: [PATCH] enhance: Revert "separate for index completed (#40873)"
(#41152)
This reverts commit 23e579e3240a30397f05f5b308be687f6f16b013. #40873
issue: #39519
Signed-off-by: Li Liu
---
internal/core/src/clustering/file_utils.h | 26 ++
internal/core/src/common/Common.cpp | 9 +
internal/core/src/common/Common.h | 4 +
internal/core/src/common/Consts.h | 1 +
internal/core/src/common/LoadInfo.h | 1 -
internal/core/src/common/init_c.cpp | 10 +
internal/core/src/common/init_c.h | 3 +
internal/core/src/index/BitmapIndex.cpp | 5 +-
internal/core/src/index/HybridScalarIndex.cpp | 3 +-
.../core/src/index/InvertedIndexTantivy.cpp | 10 +-
.../src/index/JsonKeyStatsInvertedIndex.cpp | 3 +-
internal/core/src/index/ScalarIndexSort.cpp | 3 +-
internal/core/src/index/StringIndexMarisa.cpp | 3 +-
internal/core/src/index/TextMatchIndex.cpp | 6 +-
internal/core/src/index/VectorDiskIndex.cpp | 3 +-
internal/core/src/index/VectorMemIndex.cpp | 26 +-
.../src/segcore/ChunkedSegmentSealedImpl.cpp | 18 +-
.../core/src/segcore/SegmentGrowingImpl.cpp | 56 ++--
.../core/src/segcore/SegmentSealedImpl.cpp | 17 +-
internal/core/src/segcore/Types.h | 2 -
internal/core/src/segcore/Utils.cpp | 14 +-
internal/core/src/segcore/Utils.h | 13 +-
.../core/src/segcore/load_field_data_c.cpp | 6 -
internal/core/src/segcore/load_field_data_c.h | 3 -
internal/core/src/segcore/load_index_c.cpp | 13 +-
internal/core/src/segcore/segment_c.cpp | 7 +-
.../core/src/storage/DiskFileManagerImpl.cpp | 42 +--
.../core/src/storage/DiskFileManagerImpl.h | 21 +-
.../core/src/storage/MemFileManagerImpl.cpp | 5 +-
.../core/src/storage/MemFileManagerImpl.h | 4 +-
internal/core/src/storage/ThreadPools.h | 10 +-
internal/core/src/storage/Util.cpp | 21 +-
internal/core/src/storage/Util.h | 16 +-
.../core/unittest/test_array_bitmap_index.cpp | 1 -
internal/core/unittest/test_bitmap_index.cpp | 1 -
.../unittest/test_disk_file_manager_test.cpp | 3 +-
internal/core/unittest/test_hybrid_index.cpp | 1 -
internal/core/unittest/test_indexing.cpp | 6 -
.../core/unittest/test_inverted_index.cpp | 4 +-
.../unittest/test_json_key_stats_index.cpp | 1 -
internal/core/unittest/test_utils/DataGen.h | 3 +-
.../unittest/test_utils/storage_test_utils.h | 2 +-
internal/datacoord/stats_task_meta.go | 3 +-
internal/indexnode/indexnode.go | 2 +
internal/querycoordv2/balance/balance.go | 1 -
internal/querycoordv2/balance/utils.go | 2 -
.../querycoordv2/checkers/index_checker.go | 2 -
.../querycoordv2/checkers/segment_checker.go | 73 ++----
.../checkers/segment_checker_test.go | 240 ------------------
internal/querycoordv2/handlers.go | 1 -
internal/querycoordv2/meta/replica.go | 32 +--
internal/querycoordv2/meta/replica_manager.go | 12 +-
internal/querycoordv2/task/executor.go | 1 -
internal/querycoordv2/task/task.go | 15 +-
internal/querycoordv2/task/task_test.go | 23 +-
internal/querycoordv2/task/utils_test.go | 2 -
internal/querynodev2/segments/segment.go | 14 +-
.../querynodev2/segments/segment_loader.go | 3 +-
internal/querynodev2/server.go | 2 +
internal/util/segcore/requests.go | 8 +-
pkg/proto/cgo_msg.proto | 1 -
pkg/proto/cgopb/cgo_msg.pb.go | 14 +-
pkg/proto/index_cgo_msg.proto | 2 -
pkg/proto/indexcgopb/index_cgo_msg.pb.go | 26 +-
pkg/proto/query_coord.proto | 1 -
pkg/proto/querypb/query_coord.pb.go | 14 +-
66 files changed, 224 insertions(+), 676 deletions(-)
diff --git a/internal/core/src/clustering/file_utils.h b/internal/core/src/clustering/file_utils.h
index f37d86d79a..f5e8b966c7 100644
--- a/internal/core/src/clustering/file_utils.h
+++ b/internal/core/src/clustering/file_utils.h
@@ -39,4 +39,30 @@ AddClusteringResultFiles(milvus::storage::ChunkManager* remote_chunk_manager,
map[remote_prefix] = data_size;
}
+void
+RemoveClusteringResultFiles(
+ milvus::storage::ChunkManager* remote_chunk_manager,
+ const std::unordered_map& map) {
+ auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
+ std::vector> futures;
+
+ for (auto& [file_path, file_size] : map) {
+ futures.push_back(pool.Submit(
+ [&, path = file_path]() { remote_chunk_manager->Remove(path); }));
+ }
+ std::exception_ptr first_exception = nullptr;
+ for (auto& future : futures) {
+ try {
+ future.get();
+ } catch (...) {
+ if (!first_exception) {
+ first_exception = std::current_exception();
+ }
+ }
+ }
+ if (first_exception) {
+ std::rethrow_exception(first_exception);
+ }
+}
+
} // namespace milvus::clustering
diff --git a/internal/core/src/common/Common.cpp b/internal/core/src/common/Common.cpp
index 837bf7204c..2984868534 100644
--- a/internal/core/src/common/Common.cpp
+++ b/internal/core/src/common/Common.cpp
@@ -22,6 +22,8 @@ namespace milvus {
int64_t FILE_SLICE_SIZE = DEFAULT_INDEX_FILE_SLICE_SIZE;
int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
+int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT =
+ DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
int CPU_NUM = DEFAULT_CPU_NUM;
@@ -44,6 +46,13 @@ SetHighPriorityThreadCoreCoefficient(const int64_t coefficient) {
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT);
}
+void
+SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient) {
+ MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
+ LOG_INFO("set middle priority thread pool core coefficient: {}",
+ MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT);
+}
+
void
SetLowPriorityThreadCoreCoefficient(const int64_t coefficient) {
LOW_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h
index 9cd9909c4b..3b7722fd0c 100644
--- a/internal/core/src/common/Common.h
+++ b/internal/core/src/common/Common.h
@@ -25,6 +25,7 @@ namespace milvus {
extern int64_t FILE_SLICE_SIZE;
extern int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
+extern int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int CPU_NUM;
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;
@@ -37,6 +38,9 @@ SetIndexSliceSize(const int64_t size);
void
SetHighPriorityThreadCoreCoefficient(const int64_t coefficient);
+void
+SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient);
+
void
SetLowPriorityThreadCoreCoefficient(const int64_t coefficient);
diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h
index edd0dde60a..3548e2d3ce 100644
--- a/internal/core/src/common/Consts.h
+++ b/internal/core/src/common/Consts.h
@@ -57,6 +57,7 @@ const char DEFAULT_TASK_ID[] = "0";
const int64_t DEFAULT_FIELD_MAX_MEMORY_LIMIT = 128 << 20; // bytes
const int64_t DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10;
+const int64_t DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5;
const int64_t DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1;
const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 16 << 20; // bytes
diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h
index 7dc2691456..00774b8162 100644
--- a/internal/core/src/common/LoadInfo.h
+++ b/internal/core/src/common/LoadInfo.h
@@ -40,7 +40,6 @@ struct LoadFieldDataInfo {
std::string mmap_dir_path = "";
std::string url;
int64_t storage_version = 0;
- bool recovering = false;
};
struct LoadDeletedRecordInfo {
diff --git a/internal/core/src/common/init_c.cpp b/internal/core/src/common/init_c.cpp
index 1c9bcc3454..b0058e4e8e 100644
--- a/internal/core/src/common/init_c.cpp
+++ b/internal/core/src/common/init_c.cpp
@@ -45,6 +45,16 @@ InitHighPriorityThreadCoreCoefficient(const int64_t value) {
value);
}
+void
+InitMiddlePriorityThreadCoreCoefficient(const int64_t value) {
+ std::call_once(
+ flag4,
+ [](int64_t value) {
+ milvus::SetMiddlePriorityThreadCoreCoefficient(value);
+ },
+ value);
+}
+
void
InitLowPriorityThreadCoreCoefficient(const int64_t value) {
std::call_once(
diff --git a/internal/core/src/common/init_c.h b/internal/core/src/common/init_c.h
index 0e114646f2..1895667621 100644
--- a/internal/core/src/common/init_c.h
+++ b/internal/core/src/common/init_c.h
@@ -30,6 +30,9 @@ InitIndexSliceSize(const int64_t);
void
InitHighPriorityThreadCoreCoefficient(const int64_t);
+void
+InitMiddlePriorityThreadCoreCoefficient(const int64_t);
+
void
InitLowPriorityThreadCoreCoefficient(const int64_t);
diff --git a/internal/core/src/index/BitmapIndex.cpp b/internal/core/src/index/BitmapIndex.cpp
index 5b8977c831..716faeb32e 100644
--- a/internal/core/src/index/BitmapIndex.cpp
+++ b/internal/core/src/index/BitmapIndex.cpp
@@ -569,13 +569,12 @@ BitmapIndex::LoadWithoutAssemble(const BinarySet& binary_set,
template
void
BitmapIndex::Load(milvus::tracer::TraceContext ctx, const Config& config) {
- LOG_INFO("load bitmap index with config {}", config.dump());
+ LOG_DEBUG("load bitmap index with config {}", config.dump());
auto index_files =
GetValueFromConfig>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load bitmap index");
- auto index_datas = file_manager_->LoadIndexToMemory(
- index_files.value(), config[milvus::THREAD_POOL_PRIORITY]);
+ auto index_datas = file_manager_->LoadIndexToMemory(index_files.value());
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
diff --git a/internal/core/src/index/HybridScalarIndex.cpp b/internal/core/src/index/HybridScalarIndex.cpp
index 4f2c61904b..304fea6706 100644
--- a/internal/core/src/index/HybridScalarIndex.cpp
+++ b/internal/core/src/index/HybridScalarIndex.cpp
@@ -361,8 +361,7 @@ HybridScalarIndex::Load(milvus::tracer::TraceContext ctx,
auto index_type_file = GetRemoteIndexTypeFile(index_files.value());
auto index_datas = mem_file_manager_->LoadIndexToMemory(
- std::vector{index_type_file},
- config[milvus::THREAD_POOL_PRIORITY]);
+ std::vector{index_type_file});
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp
index 7fe384c47e..caafa1b55a 100644
--- a/internal/core/src/index/InvertedIndexTantivy.cpp
+++ b/internal/core/src/index/InvertedIndexTantivy.cpp
@@ -214,16 +214,15 @@ InvertedIndexTantivy::Load(milvus::tracer::TraceContext ctx,
}
}
- auto index_datas = mem_file_manager_->LoadIndexToMemory(
- null_offset_files, config[milvus::THREAD_POOL_PRIORITY]);
+ auto index_datas =
+ mem_file_manager_->LoadIndexToMemory(null_offset_files);
AssembleIndexDatas(index_datas);
null_offset_data = index_datas.at(INDEX_NULL_OFFSET_FILE_NAME);
} else if (auto it = find_file(INDEX_NULL_OFFSET_FILE_NAME);
it != inverted_index_files.end()) {
// null offset file is not sliced
null_offset_files.push_back(*it);
- auto index_datas = mem_file_manager_->LoadIndexToMemory(
- {*it}, config[milvus::THREAD_POOL_PRIORITY]);
+ auto index_datas = mem_file_manager_->LoadIndexToMemory({*it});
null_offset_data = index_datas.at(INDEX_NULL_OFFSET_FILE_NAME);
}
@@ -245,8 +244,7 @@ InvertedIndexTantivy::Load(milvus::tracer::TraceContext ctx,
file) != null_offset_files.end();
}),
inverted_index_files.end());
- disk_file_manager_->CacheIndexToDisk(inverted_index_files,
- config[milvus::THREAD_POOL_PRIORITY]);
+ disk_file_manager_->CacheIndexToDisk(inverted_index_files);
path_ = prefix;
wrapper_ = std::make_shared(prefix.c_str());
}
diff --git a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp
index 307456d299..a3bbd5cbb8 100644
--- a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp
+++ b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp
@@ -383,8 +383,7 @@ JsonKeyStatsInvertedIndex::Load(milvus::tracer::TraceContext ctx,
index_file = remote_prefix + "/" + index_file;
}
}
- disk_file_manager_->CacheJsonKeyIndexToDisk(
- index_files.value(), config[milvus::THREAD_POOL_PRIORITY]);
+ disk_file_manager_->CacheJsonKeyIndexToDisk(index_files.value());
AssertInfo(
tantivy_index_exist(path_.c_str()), "index not exist: {}", path_);
wrapper_ = std::make_shared(path_.c_str());
diff --git a/internal/core/src/index/ScalarIndexSort.cpp b/internal/core/src/index/ScalarIndexSort.cpp
index ff370fb8f9..1fd10d2118 100644
--- a/internal/core/src/index/ScalarIndexSort.cpp
+++ b/internal/core/src/index/ScalarIndexSort.cpp
@@ -206,8 +206,7 @@ ScalarIndexSort::Load(milvus::tracer::TraceContext ctx,
GetValueFromConfig>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load disk ann index");
- auto index_datas = file_manager_->LoadIndexToMemory(
- index_files.value(), config[milvus::THREAD_POOL_PRIORITY]);
+ auto index_datas = file_manager_->LoadIndexToMemory(index_files.value());
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
diff --git a/internal/core/src/index/StringIndexMarisa.cpp b/internal/core/src/index/StringIndexMarisa.cpp
index d56864293b..3182a8bda5 100644
--- a/internal/core/src/index/StringIndexMarisa.cpp
+++ b/internal/core/src/index/StringIndexMarisa.cpp
@@ -233,8 +233,7 @@ StringIndexMarisa::Load(milvus::tracer::TraceContext ctx,
GetValueFromConfig>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load index");
- auto index_datas = file_manager_->LoadIndexToMemory(
- index_files.value(), config[milvus::THREAD_POOL_PRIORITY]);
+ auto index_datas = file_manager_->LoadIndexToMemory(index_files.value());
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
diff --git a/internal/core/src/index/TextMatchIndex.cpp b/internal/core/src/index/TextMatchIndex.cpp
index 3db642548a..ee69d91044 100644
--- a/internal/core/src/index/TextMatchIndex.cpp
+++ b/internal/core/src/index/TextMatchIndex.cpp
@@ -136,8 +136,7 @@ TextMatchIndex::Load(const Config& config) {
std::vector file;
file.push_back(*it);
files_value.erase(it);
- auto index_datas = mem_file_manager_->LoadIndexToMemory(
- file, config[milvus::THREAD_POOL_PRIORITY]);
+ auto index_datas = mem_file_manager_->LoadIndexToMemory(file);
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
@@ -154,8 +153,7 @@ TextMatchIndex::Load(const Config& config) {
index_valid_data->data.get(),
(size_t)index_valid_data->size);
}
- disk_file_manager_->CacheTextLogToDisk(
- files_value, config[milvus::THREAD_POOL_PRIORITY]);
+ disk_file_manager_->CacheTextLogToDisk(files_value);
AssertInfo(
tantivy_index_exist(prefix.c_str()), "index not exist: {}", prefix);
wrapper_ = std::make_shared(prefix.c_str());
diff --git a/internal/core/src/index/VectorDiskIndex.cpp b/internal/core/src/index/VectorDiskIndex.cpp
index 51af03f849..ac0314420c 100644
--- a/internal/core/src/index/VectorDiskIndex.cpp
+++ b/internal/core/src/index/VectorDiskIndex.cpp
@@ -96,8 +96,7 @@ VectorDiskAnnIndex::Load(milvus::tracer::TraceContext ctx,
GetValueFromConfig>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load disk ann index data");
- file_manager_->CacheIndexToDisk(index_files.value(),
- config[milvus::THREAD_POOL_PRIORITY]);
+ file_manager_->CacheIndexToDisk(index_files.value());
read_file_span->End();
}
diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp
index 2c635118d6..7c492f7b3a 100644
--- a/internal/core/src/index/VectorMemIndex.cpp
+++ b/internal/core/src/index/VectorMemIndex.cpp
@@ -210,8 +210,8 @@ VectorMemIndex::Load(milvus::tracer::TraceContext ctx,
std::string index_file_prefix = slice_meta_filepath.substr(
0, slice_meta_filepath.find_last_of('/') + 1);
- auto result = file_manager_->LoadIndexToMemory(
- {slice_meta_filepath}, config[milvus::THREAD_POOL_PRIORITY]);
+ auto result =
+ file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast(raw_slice_meta->Data()),
@@ -231,8 +231,7 @@ VectorMemIndex::Load(milvus::tracer::TraceContext ctx,
batch.push_back(index_file_prefix + file_name);
}
- auto batch_data = file_manager_->LoadIndexToMemory(
- batch, config[milvus::THREAD_POOL_PRIORITY]);
+ auto batch_data = file_manager_->LoadIndexToMemory(batch);
for (const auto& file_path : batch) {
const std::string file_name =
file_path.substr(file_path.find_last_of('/') + 1);
@@ -254,10 +253,9 @@ VectorMemIndex::Load(milvus::tracer::TraceContext ctx,
}
if (!pending_index_files.empty()) {
- auto result = file_manager_->LoadIndexToMemory(
- std::vector(pending_index_files.begin(),
- pending_index_files.end()),
- config[milvus::THREAD_POOL_PRIORITY]);
+ auto result =
+ file_manager_->LoadIndexToMemory(std::vector(
+ pending_index_files.begin(), pending_index_files.end()));
for (auto&& index_data : result) {
index_datas.insert(std::move(index_data));
}
@@ -569,8 +567,7 @@ void VectorMemIndex::LoadFromFile(const Config& config) {
std::vector batch{};
batch.reserve(parallel_degree);
- auto result = file_manager_->LoadIndexToMemory(
- {slice_meta_filepath}, config[milvus::THREAD_POOL_PRIORITY]);
+ auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast(raw_slice_meta->Data()),
@@ -582,8 +579,7 @@ void VectorMemIndex::LoadFromFile(const Config& config) {
auto total_len = static_cast(item[TOTAL_LEN]);
auto HandleBatch = [&](int index) {
auto start_load2_mem = std::chrono::system_clock::now();
- auto batch_data = file_manager_->LoadIndexToMemory(
- batch, config[milvus::THREAD_POOL_PRIORITY]);
+ auto batch_data = file_manager_->LoadIndexToMemory(batch);
load_duration_sum +=
(std::chrono::system_clock::now() - start_load2_mem);
for (int j = index - batch.size() + 1; j <= index; j++) {
@@ -621,10 +617,8 @@ void VectorMemIndex::LoadFromFile(const Config& config) {
} else {
//1. load files into memory
auto start_load_files2_mem = std::chrono::system_clock::now();
- auto result = file_manager_->LoadIndexToMemory(
- std::vector(pending_index_files.begin(),
- pending_index_files.end()),
- config[milvus::THREAD_POOL_PRIORITY]);
+ auto result = file_manager_->LoadIndexToMemory(std::vector(
+ pending_index_files.begin(), pending_index_files.end()));
load_duration_sum +=
(std::chrono::system_clock::now() - start_load_files2_mem);
//2. write data into files
diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
index eaa127118c..b02810b405 100644
--- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
+++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
@@ -243,17 +243,15 @@ ChunkedSegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
auto parallel_degree = static_cast(
DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
field_data_info.arrow_reader_channel->set_capacity(parallel_degree * 2);
- auto priority = load_info.recovering ? milvus::ThreadPoolPriority::HIGH
- : milvus::ThreadPoolPriority::LOW;
- LoadArrowReaderFromRemote(
- insert_files, field_data_info.arrow_reader_channel, priority);
+ auto& pool =
+ ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
+ pool.Submit(LoadArrowReaderFromRemote,
+ insert_files,
+ field_data_info.arrow_reader_channel);
- LOG_INFO(
- "segment {} submits load field {} task to thread pool, "
- "recovering:{}",
- this->get_segment_id(),
- field_id.get(),
- load_info.recovering);
+ LOG_INFO("segment {} submits load field {} task to thread pool",
+ this->get_segment_id(),
+ field_id.get());
bool use_mmap = false;
if (!info.enable_mmap ||
SystemProperty::Instance().IsSystem(field_id)) {
diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp
index 08fda5d0a3..20a9939073 100644
--- a/internal/core/src/segcore/SegmentGrowingImpl.cpp
+++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp
@@ -222,8 +222,6 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
AssertInfo(infos.field_infos.find(primary_field_id.get()) !=
infos.field_infos.end(),
"primary field data should be included");
- auto priority = infos.recovering ? milvus::ThreadPoolPriority::HIGH
- : milvus::ThreadPoolPriority::LOW;
size_t num_rows = storage::GetNumRowsForLoadInfo(infos);
auto reserved_offset = PreInsert(num_rows);
@@ -236,31 +234,29 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
return std::stol(a.substr(a.find_last_of('/') + 1)) <
std::stol(b.substr(b.find_last_of('/') + 1));
});
- LOG_INFO(
- "segment {} loads field {} with num_rows {}, insert_files_count:{}",
- this->get_segment_id(),
- field_id.get(),
- num_rows,
- insert_files.size());
auto channel = std::make_shared();
- LoadFieldDatasFromRemote(insert_files, channel, priority);
- auto loaded_field_datas_info =
- storage::CollectFieldDataChannelWithInfos(channel);
- LOG_INFO(
- "segment {} loads field {} with num_rows {}, loaded_data_size:{}",
- this->get_segment_id(),
- field_id.get(),
- num_rows,
- loaded_field_datas_info.data_size_);
- auto& loaded_field_datas = loaded_field_datas_info.loaded_field_datas_;
+ auto& pool =
+ ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
+
+ LOG_INFO("segment {} loads field {} with num_rows {}",
+ this->get_segment_id(),
+ field_id.get(),
+ num_rows);
+ auto load_future =
+ pool.Submit(LoadFieldDatasFromRemote, insert_files, channel);
+
+ LOG_INFO("segment {} submits load field {} task to thread pool",
+ this->get_segment_id(),
+ field_id.get());
+ auto field_data = storage::CollectFieldDataChannel(channel);
if (field_id == TimestampFieldID) {
// step 2: sort timestamp
// query node already guarantees that the timestamp is ordered, avoid field data copy in c++
// step 3: fill into Segment.ConcurrentVector
insert_record_.timestamps_.set_data_raw(reserved_offset,
- loaded_field_datas);
+ field_data);
continue;
}
@@ -271,14 +267,14 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
if (!indexing_record_.HasRawData(field_id)) {
if (insert_record_.is_valid_data_exist(field_id)) {
insert_record_.get_valid_data(field_id)->set_data_raw(
- loaded_field_datas);
+ field_data);
}
insert_record_.get_data_base(field_id)->set_data_raw(
- reserved_offset, loaded_field_datas);
+ reserved_offset, field_data);
}
if (segcore_config_.get_enable_interim_segment_index()) {
auto offset = reserved_offset;
- for (auto& data : loaded_field_datas) {
+ for (auto& data : field_data) {
auto row_count = data->get_num_rows();
indexing_record_.AppendingIndex(
offset, row_count, field_id, data, insert_record_);
@@ -288,7 +284,7 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
try_remove_chunks(field_id);
if (field_id == primary_field_id) {
- insert_record_.insert_pks(loaded_field_datas);
+ insert_record_.insert_pks(field_data);
}
// update average row data size
@@ -297,13 +293,13 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
SegmentInternalInterface::set_field_avg_size(
field_id,
num_rows,
- storage::GetByteSizeOfFieldDatas(loaded_field_datas));
+ storage::GetByteSizeOfFieldDatas(field_data));
}
// build text match index
if (field_meta.enable_match()) {
auto index = GetTextIndex(field_id);
- index->BuildIndexFromFieldData(loaded_field_datas,
+ index->BuildIndexFromFieldData(field_data,
field_meta.is_nullable());
index->Commit();
// Reload reader so that the index can be read immediately
@@ -313,20 +309,18 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
// build json match index
if (field_meta.enable_growing_jsonStats()) {
auto index = GetJsonKeyIndex(field_id);
- index->BuildWithFieldData(loaded_field_datas,
- field_meta.is_nullable());
+ index->BuildWithFieldData(field_data, field_meta.is_nullable());
index->Commit();
// Reload reader so that the index can be read immediately
index->Reload();
}
// update the mem size
- stats_.mem_size += storage::GetByteSizeOfFieldDatas(loaded_field_datas);
+ stats_.mem_size += storage::GetByteSizeOfFieldDatas(field_data);
- LOG_INFO("segment {} loads field {} done, recovering:{}",
+ LOG_INFO("segment {} loads field {} done",
this->get_segment_id(),
- field_id.get(),
- infos.recovering);
+ field_id.get());
}
// step 5: update small indexes
diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp
index 93b6edc4ad..df5b13b209 100644
--- a/internal/core/src/segcore/SegmentSealedImpl.cpp
+++ b/internal/core/src/segcore/SegmentSealedImpl.cpp
@@ -320,17 +320,14 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
auto parallel_degree = static_cast(
DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
field_data_info.channel->set_capacity(parallel_degree * 2);
- auto priority = load_info.recovering ? milvus::ThreadPoolPriority::HIGH
- : milvus::ThreadPoolPriority::LOW;
- LoadFieldDatasFromRemote(
- insert_files, field_data_info.channel, priority);
+ auto& pool =
+ ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
+ pool.Submit(
+ LoadFieldDatasFromRemote, insert_files, field_data_info.channel);
- LOG_INFO(
- "segment {} submits load field {} task to thread pool, "
- "recovering:{}",
- this->get_segment_id(),
- field_id.get(),
- load_info.recovering);
+ LOG_INFO("segment {} submits load field {} task to thread pool",
+ this->get_segment_id(),
+ field_id.get());
bool use_mmap = false;
if (!info.enable_mmap ||
SystemProperty::Instance().IsSystem(field_id)) {
diff --git a/internal/core/src/segcore/Types.h b/internal/core/src/segcore/Types.h
index a8751722a4..c1cf8e524a 100644
--- a/internal/core/src/segcore/Types.h
+++ b/internal/core/src/segcore/Types.h
@@ -51,8 +51,6 @@ struct LoadIndexInfo {
// (aka. the filesize before loading operation at knowhere),
// because the uncompressed-index-file-size may not be stored at previous milvus.
// so the size may be not accurate (generated by the compressed-index-file-size multiplied with a compress-ratio)
- bool
- recovering; //indicator to separate recovering load and normal load, to reduce influence for front-end search&&query
};
} // namespace milvus::segcore
diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp
index 84c2f1b82f..a04e62671f 100644
--- a/internal/core/src/segcore/Utils.cpp
+++ b/internal/core/src/segcore/Utils.cpp
@@ -869,12 +869,12 @@ ReverseDataFromIndex(const index::IndexBase* index,
// segcore use default remote chunk manager to load data from minio/s3
void
LoadArrowReaderFromRemote(const std::vector& remote_files,
- std::shared_ptr channel,
- milvus::ThreadPoolPriority priority) {
+ std::shared_ptr channel) {
try {
auto rcm = storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
- auto& pool = ThreadPools::GetThreadPool(priority);
+ auto& pool = ThreadPools::GetThreadPool(ThreadPoolPriority::HIGH);
+
std::vector>>
futures;
futures.reserve(remote_files.size());
@@ -905,12 +905,12 @@ LoadArrowReaderFromRemote(const std::vector& remote_files,
void
LoadFieldDatasFromRemote(const std::vector& remote_files,
- FieldDataChannelPtr channel,
- milvus::ThreadPoolPriority priority) {
+ FieldDataChannelPtr channel) {
try {
auto rcm = storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
- auto& pool = ThreadPools::GetThreadPool(priority);
+ auto& pool = ThreadPools::GetThreadPool(ThreadPoolPriority::HIGH);
+
std::vector> futures;
futures.reserve(remote_files.size());
for (const auto& file : remote_files) {
@@ -923,10 +923,12 @@ LoadFieldDatasFromRemote(const std::vector& remote_files,
});
futures.emplace_back(std::move(future));
}
+
for (auto& future : futures) {
auto field_data = future.get();
channel->push(field_data);
}
+
channel->close();
} catch (std::exception& e) {
LOG_INFO("failed to load data from remote: {}", e.what());
diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h
index bb77e2fba1..a946fae20b 100644
--- a/internal/core/src/segcore/Utils.h
+++ b/internal/core/src/segcore/Utils.h
@@ -28,7 +28,6 @@
#include "log/Log.h"
#include "segcore/DeletedRecord.h"
#include "segcore/InsertRecord.h"
-#include "storage/ThreadPools.h"
namespace milvus::segcore {
@@ -116,16 +115,12 @@ ReverseDataFromIndex(const index::IndexBase* index,
const FieldMeta& field_meta);
void
-LoadArrowReaderFromRemote(
- const std::vector& remote_files,
- std::shared_ptr channel,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+LoadArrowReaderFromRemote(const std::vector& remote_files,
+ std::shared_ptr channel);
void
-LoadFieldDatasFromRemote(
- const std::vector& remote_files,
- FieldDataChannelPtr channel,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+LoadFieldDatasFromRemote(const std::vector& remote_files,
+ FieldDataChannelPtr channel);
/**
* Returns an index pointing to the first element in the range [first, last) such that `value < element` is true
* (i.e. that is strictly greater than value), or last if no such element is found.
diff --git a/internal/core/src/segcore/load_field_data_c.cpp b/internal/core/src/segcore/load_field_data_c.cpp
index d9d415c64c..ddab5a28f4 100644
--- a/internal/core/src/segcore/load_field_data_c.cpp
+++ b/internal/core/src/segcore/load_field_data_c.cpp
@@ -109,9 +109,3 @@ EnableMmap(CLoadFieldDataInfo c_load_field_data_info,
auto info = static_cast(c_load_field_data_info);
info->field_infos[field_id].enable_mmap = enabled;
}
-
-void
-SetRecovering(CLoadFieldDataInfo c_load_field_data_info, bool recovering) {
- auto info = static_cast(c_load_field_data_info);
- info->recovering = recovering;
-}
diff --git a/internal/core/src/segcore/load_field_data_c.h b/internal/core/src/segcore/load_field_data_c.h
index 9e4a8378fa..0b3b7dee9f 100644
--- a/internal/core/src/segcore/load_field_data_c.h
+++ b/internal/core/src/segcore/load_field_data_c.h
@@ -58,9 +58,6 @@ EnableMmap(CLoadFieldDataInfo c_load_field_data_info,
int64_t field_id,
bool enabled);
-void
-SetRecovering(CLoadFieldDataInfo c_load_field_data_info, bool recovering);
-
#ifdef __cplusplus
}
#endif
diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp
index f82fc5cee1..b847c4d297 100644
--- a/internal/core/src/segcore/load_index_c.cpp
+++ b/internal/core/src/segcore/load_index_c.cpp
@@ -30,7 +30,6 @@
#include "pb/cgo_msg.pb.h"
#include "knowhere/index/index_static.h"
#include "knowhere/comp/knowhere_check.h"
-#include "storage/ThreadPools.h"
bool
IsLoadWithDisk(const char* index_type, int index_engine_version) {
@@ -269,14 +268,12 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
milvus::tracer::SetRootSpan(span);
LOG_INFO(
- "[collection={}][segment={}][field={}][enable_mmap={}][recovering={"
- "}] load index "
+ "[collection={}][segment={}][field={}][enable_mmap={}] load index "
"{}",
load_index_info->collection_id,
load_index_info->segment_id,
load_index_info->field_id,
load_index_info->enable_mmap,
- load_index_info->recovering,
load_index_info->index_id);
// get index type
@@ -336,9 +333,6 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
config[milvus::index::ENABLE_MMAP] = "true";
config[milvus::index::MMAP_FILE_PATH] = filepath.string();
}
- config[milvus::THREAD_POOL_PRIORITY] =
- load_index_info->recovering ? milvus::ThreadPoolPriority::HIGH
- : milvus::ThreadPoolPriority::LOW;
LOG_DEBUG("load index with configs: {}", config.dump());
load_index_info->index->Load(ctx, config);
@@ -347,14 +341,12 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
milvus::tracer::CloseRootSpan();
LOG_INFO(
- "[collection={}][segment={}][field={}][enable_mmap={}][recovering={"
- "}] load index "
+ "[collection={}][segment={}][field={}][enable_mmap={}] load index "
"{} done",
load_index_info->collection_id,
load_index_info->segment_id,
load_index_info->field_id,
load_index_info->enable_mmap,
- load_index_info->recovering,
load_index_info->index_id);
auto status = CStatus();
@@ -502,7 +494,6 @@ FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info,
info_proto->index_engine_version();
load_index_info->schema = info_proto->field();
load_index_info->index_size = info_proto->index_file_size();
- load_index_info->recovering = info_proto->recovering();
}
auto status = CStatus();
status.error_code = milvus::Success;
diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp
index 227ea01d9f..e4c002d0f1 100644
--- a/internal/core/src/segcore/segment_c.cpp
+++ b/internal/core/src/segcore/segment_c.cpp
@@ -475,9 +475,7 @@ LoadTextIndex(CSegmentInterface c_segment,
files.push_back(f);
}
config["index_files"] = files;
- config[milvus::THREAD_POOL_PRIORITY] =
- info_proto->recovering() ? milvus::ThreadPoolPriority::HIGH
- : milvus::ThreadPoolPriority::LOW;
+
milvus::storage::FileManagerContext ctx(
field_meta, index_meta, remote_chunk_manager);
@@ -530,9 +528,6 @@ LoadJsonKeyIndex(CTraceContext c_trace,
files.push_back(f);
}
config["index_files"] = files;
- config[milvus::THREAD_POOL_PRIORITY] =
- info_proto->recovering() ? milvus::ThreadPoolPriority::HIGH
- : milvus::ThreadPoolPriority::LOW;
milvus::storage::FileManagerContext file_ctx(
field_meta, index_meta, remote_chunk_manager);
diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp
index 191c452920..6713b14930 100644
--- a/internal/core/src/storage/DiskFileManagerImpl.cpp
+++ b/internal/core/src/storage/DiskFileManagerImpl.cpp
@@ -220,8 +220,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
void
DiskFileManagerImpl::CacheIndexToDiskInternal(
const std::vector& remote_files,
- const std::function& get_local_index_prefix,
- milvus::ThreadPoolPriority priority) noexcept {
+ const std::function& get_local_index_prefix) noexcept {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
@@ -261,8 +260,7 @@ DiskFileManagerImpl::CacheIndexToDiskInternal(
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
auto appendIndexFiles = [&]() {
- auto index_chunks =
- GetObjectData(rcm_.get(), batch_remote_files, priority);
+ auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files);
for (auto& chunk : index_chunks) {
auto index_data = chunk.get()->GetFieldData();
auto index_size = index_data->DataSize();
@@ -290,36 +288,28 @@ DiskFileManagerImpl::CacheIndexToDiskInternal(
void
DiskFileManagerImpl::CacheIndexToDisk(
- const std::vector& remote_files,
- milvus::ThreadPoolPriority priority) {
+ const std::vector& remote_files) {
return CacheIndexToDiskInternal(
- remote_files,
- [this]() { return GetLocalIndexObjectPrefix(); },
- priority);
+ remote_files, [this]() { return GetLocalIndexObjectPrefix(); });
}
void
DiskFileManagerImpl::CacheTextLogToDisk(
- const std::vector& remote_files,
- milvus::ThreadPoolPriority priority) {
+ const std::vector& remote_files) {
return CacheIndexToDiskInternal(
- remote_files, [this]() { return GetLocalTextIndexPrefix(); }, priority);
+ remote_files, [this]() { return GetLocalTextIndexPrefix(); });
}
void
DiskFileManagerImpl::CacheJsonKeyIndexToDisk(
- const std::vector& remote_files,
- milvus::ThreadPoolPriority priority) {
+ const std::vector& remote_files) {
return CacheIndexToDiskInternal(
- remote_files,
- [this]() { return GetLocalJsonKeyIndexPrefix(); },
- priority);
+ remote_files, [this]() { return GetLocalJsonKeyIndexPrefix(); });
}
template
std::string
-DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files,
- milvus::ThreadPoolPriority priority) {
+DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) {
SortByPath(remote_files);
auto segment_id = GetFieldDataMeta().segment_id;
@@ -351,7 +341,7 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files,
int64_t write_offset = sizeof(num_rows) + sizeof(dim);
auto FetchRawData = [&]() {
- auto field_datas = GetObjectData(rcm_.get(), batch_files, priority);
+ auto field_datas = GetObjectData(rcm_.get(), batch_files);
int batch_size = batch_files.size();
for (int i = 0; i < batch_size; ++i) {
auto field_data = field_datas[i].get()->GetFieldData();
@@ -704,18 +694,14 @@ DiskFileManagerImpl::IsExisted(const std::string& file) noexcept {
template std::string
DiskFileManagerImpl::CacheRawDataToDisk(
- std::vector remote_files,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+ std::vector remote_files);
template std::string
DiskFileManagerImpl::CacheRawDataToDisk(
- std::vector remote_files,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+ std::vector remote_files);
template std::string
DiskFileManagerImpl::CacheRawDataToDisk(
- std::vector remote_files,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+ std::vector remote_files);
template std::string
DiskFileManagerImpl::CacheRawDataToDisk(
- std::vector remote_files,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+ std::vector remote_files);
} // namespace milvus::storage
diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h
index f603e2d3dd..896c2b991e 100644
--- a/internal/core/src/storage/DiskFileManagerImpl.h
+++ b/internal/core/src/storage/DiskFileManagerImpl.h
@@ -26,7 +26,6 @@
#include "storage/FileManager.h"
#include "storage/ChunkManager.h"
#include "common/Consts.h"
-#include "storage/ThreadPools.h"
namespace milvus::storage {
@@ -103,19 +102,13 @@ class DiskFileManagerImpl : public FileManagerImpl {
}
void
- CacheIndexToDisk(
- const std::vector& remote_files,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+ CacheIndexToDisk(const std::vector& remote_files);
void
- CacheTextLogToDisk(
- const std::vector& remote_files,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+ CacheTextLogToDisk(const std::vector& remote_files);
void
- CacheJsonKeyIndexToDisk(
- const std::vector& remote_files,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+ CacheJsonKeyIndexToDisk(const std::vector& remote_files);
void
AddBatchIndexFiles(const std::string& local_file_name,
@@ -125,9 +118,7 @@ class DiskFileManagerImpl : public FileManagerImpl {
template
std::string
- CacheRawDataToDisk(
- std::vector remote_files,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+ CacheRawDataToDisk(std::vector remote_files);
std::string
CacheOptFieldToDisk(OptFieldT& fields_map);
@@ -168,9 +159,7 @@ class DiskFileManagerImpl : public FileManagerImpl {
void
CacheIndexToDiskInternal(
const std::vector& remote_files,
- const std::function& get_local_index_prefix,
- milvus::ThreadPoolPriority priority =
- milvus::ThreadPoolPriority::LOW) noexcept;
+ const std::function& get_local_index_prefix) noexcept;
private:
// local file path (abs path)
diff --git a/internal/core/src/storage/MemFileManagerImpl.cpp b/internal/core/src/storage/MemFileManagerImpl.cpp
index f0096a7cb7..3f21781bc4 100644
--- a/internal/core/src/storage/MemFileManagerImpl.cpp
+++ b/internal/core/src/storage/MemFileManagerImpl.cpp
@@ -98,15 +98,14 @@ MemFileManagerImpl::LoadFile(const std::string& filename) noexcept {
std::map
MemFileManagerImpl::LoadIndexToMemory(
- const std::vector& remote_files,
- milvus::ThreadPoolPriority priority) {
+ const std::vector& remote_files) {
std::map file_to_index_data;
auto parallel_degree =
static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
std::vector batch_files;
auto LoadBatchIndexFiles = [&]() {
- auto index_datas = GetObjectData(rcm_.get(), batch_files, priority);
+ auto index_datas = GetObjectData(rcm_.get(), batch_files);
for (size_t idx = 0; idx < batch_files.size(); ++idx) {
auto file_name =
batch_files[idx].substr(batch_files[idx].find_last_of('/') + 1);
diff --git a/internal/core/src/storage/MemFileManagerImpl.h b/internal/core/src/storage/MemFileManagerImpl.h
index 5f47bd56ad..13bd219ec3 100644
--- a/internal/core/src/storage/MemFileManagerImpl.h
+++ b/internal/core/src/storage/MemFileManagerImpl.h
@@ -26,7 +26,6 @@
#include "storage/IndexData.h"
#include "storage/FileManager.h"
#include "storage/ChunkManager.h"
-#include "storage/ThreadPools.h"
namespace milvus::storage {
@@ -53,8 +52,7 @@ class MemFileManagerImpl : public FileManagerImpl {
}
std::map
- LoadIndexToMemory(const std::vector& remote_files,
- milvus::ThreadPoolPriority priority);
+ LoadIndexToMemory(const std::vector& remote_files);
std::vector
CacheRawDataToMemory(std::vector remote_files);
diff --git a/internal/core/src/storage/ThreadPools.h b/internal/core/src/storage/ThreadPools.h
index 9e0b4e2765..a728befabb 100644
--- a/internal/core/src/storage/ThreadPools.h
+++ b/internal/core/src/storage/ThreadPools.h
@@ -22,11 +22,10 @@
namespace milvus {
-constexpr const char* THREAD_POOL_PRIORITY = "priority";
-
enum ThreadPoolPriority {
HIGH = 0,
- LOW = 1,
+ MIDDLE = 1,
+ LOW = 2,
};
class ThreadPools {
@@ -41,14 +40,17 @@ class ThreadPools {
private:
ThreadPools() {
name_map[HIGH] = "high_priority_thread_pool";
+ name_map[MIDDLE] = "middle_priority_thread_pool";
name_map[LOW] = "low_priority_thread_pool";
}
static void
SetUpCoefficients() {
coefficient_map[HIGH] = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
+ coefficient_map[MIDDLE] = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
coefficient_map[LOW] = LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
- LOG_INFO("Init ThreadPools, high_priority_co={}, low={}",
+ LOG_INFO("Init ThreadPools, high_priority_co={}, middle={}, low={}",
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT,
+ MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT,
LOW_PRIORITY_THREAD_CORE_COEFFICIENT);
}
void
diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp
index 042f127967..57a2d44b91 100644
--- a/internal/core/src/storage/Util.cpp
+++ b/internal/core/src/storage/Util.cpp
@@ -639,9 +639,8 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
std::vector>>
GetObjectData(ChunkManager* remote_chunk_manager,
- const std::vector& remote_files,
- milvus::ThreadPoolPriority priority) {
- auto& pool = ThreadPools::GetThreadPool(priority);
+ const std::vector& remote_files) {
+ auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
std::vector>> futures;
futures.reserve(remote_files.size());
for (auto& file : remote_files) {
@@ -658,7 +657,7 @@ PutIndexData(ChunkManager* remote_chunk_manager,
const std::vector& slice_names,
FieldDataMeta& field_meta,
IndexMeta& index_meta) {
- auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW);
+ auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
std::vector>> futures;
AssertInfo(data_slices.size() == slice_sizes.size(),
"inconsistent data slices size {} with slice sizes {}",
@@ -859,20 +858,6 @@ GetByteSizeOfFieldDatas(const std::vector& field_datas) {
return result;
}
-LoadedFieldDatasInfo
-CollectFieldDataChannelWithInfos(FieldDataChannelPtr& channel) {
- std::vector result;
- FieldDataPtr field_data;
- int64_t field_data_size = 0;
- int64_t field_data_rows = 0;
- while (channel->pop(field_data)) {
- result.push_back(field_data);
- field_data_size += field_data->DataSize();
- field_data_rows += field_data->get_num_rows();
- }
- return {result, field_data_size, field_data_rows};
-}
-
std::vector
CollectFieldDataChannel(FieldDataChannelPtr& channel) {
std::vector result;
diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h
index ea1db6c9cc..511656220d 100644
--- a/internal/core/src/storage/Util.h
+++ b/internal/core/src/storage/Util.h
@@ -31,7 +31,6 @@
#include "storage/ChunkManager.h"
#include "storage/DataCodec.h"
#include "storage/Types.h"
-#include "storage/ThreadPools.h"
namespace milvus::storage {
@@ -140,10 +139,8 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
std::string object_key);
std::vector>>
-GetObjectData(
- ChunkManager* remote_chunk_manager,
- const std::vector& remote_files,
- milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW);
+GetObjectData(ChunkManager* remote_chunk_manager,
+ const std::vector& remote_files);
std::map
PutIndexData(ChunkManager* remote_chunk_manager,
@@ -177,15 +174,6 @@ CreateFieldData(const DataType& type,
int64_t
GetByteSizeOfFieldDatas(const std::vector& field_datas);
-struct LoadedFieldDatasInfo {
- std::vector loaded_field_datas_;
- int64_t data_size_;
- int64_t data_rows_;
-};
-
-LoadedFieldDatasInfo
-CollectFieldDataChannelWithInfos(FieldDataChannelPtr& channel);
-
std::vector
CollectFieldDataChannel(FieldDataChannelPtr& channel);
diff --git a/internal/core/unittest/test_array_bitmap_index.cpp b/internal/core/unittest/test_array_bitmap_index.cpp
index 7597d14fb4..66205b1139 100644
--- a/internal/core/unittest/test_array_bitmap_index.cpp
+++ b/internal/core/unittest/test_array_bitmap_index.cpp
@@ -249,7 +249,6 @@ class ArrayBitmapIndexTest : public testing::Test {
index_info.field_type = DataType::ARRAY;
config["index_files"] = index_files;
- config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
ctx.set_for_loading_index(true);
index_ =
diff --git a/internal/core/unittest/test_bitmap_index.cpp b/internal/core/unittest/test_bitmap_index.cpp
index afe0b31ddb..078546ae86 100644
--- a/internal/core/unittest/test_bitmap_index.cpp
+++ b/internal/core/unittest/test_bitmap_index.cpp
@@ -167,7 +167,6 @@ class BitmapIndexTest : public testing::Test {
field_id);
;
}
- config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
index_ =
index::IndexFactory::GetInstance().CreateIndex(index_info, ctx);
index_->Load(milvus::tracer::TraceContext{}, config);
diff --git a/internal/core/unittest/test_disk_file_manager_test.cpp b/internal/core/unittest/test_disk_file_manager_test.cpp
index e6d58bac5d..565e063b6d 100644
--- a/internal/core/unittest/test_disk_file_manager_test.cpp
+++ b/internal/core/unittest/test_disk_file_manager_test.cpp
@@ -94,8 +94,7 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositiveParallel) {
std::cout << file2size.first << std::endl;
remote_files.emplace_back(file2size.first);
}
- diskAnnFileManager->CacheIndexToDisk(remote_files,
- milvus::ThreadPoolPriority::HIGH);
+ diskAnnFileManager->CacheIndexToDisk(remote_files);
auto local_files = diskAnnFileManager->GetLocalFilePaths();
for (auto& file : local_files) {
auto file_size = lcm->Size(file);
diff --git a/internal/core/unittest/test_hybrid_index.cpp b/internal/core/unittest/test_hybrid_index.cpp
index 906782c4a4..fb95b2c13f 100644
--- a/internal/core/unittest/test_hybrid_index.cpp
+++ b/internal/core/unittest/test_hybrid_index.cpp
@@ -162,7 +162,6 @@ class HybridIndexTestV1 : public testing::Test {
index_info.field_type = type_;
config["index_files"] = index_files;
- config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
ctx.set_for_loading_index(true);
index_ =
diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp
index ce2c065eca..1b29a22a0c 100644
--- a/internal/core/unittest/test_indexing.cpp
+++ b/internal/core/unittest/test_indexing.cpp
@@ -500,7 +500,6 @@ TEST_P(IndexTest, BuildAndQuery) {
ASSERT_GT(serializedSize, 0);
load_conf = generate_load_conf(index_type, metric_type, 0);
load_conf["index_files"] = index_files;
- load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
ASSERT_NO_THROW(vec_index->Load(milvus::tracer::TraceContext{}, load_conf));
EXPECT_EQ(vec_index->Count(), NB);
if (!is_sparse) {
@@ -568,7 +567,6 @@ TEST_P(IndexTest, Mmap) {
load_conf = generate_load_conf(index_type, metric_type, 0);
load_conf["index_files"] = index_files;
load_conf["mmap_filepath"] = "mmap/test_index_mmap_" + index_type;
- load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
EXPECT_EQ(vec_index->GetDim(), is_sparse ? kTestSparseDim : DIM);
@@ -624,7 +622,6 @@ TEST_P(IndexTest, GetVector) {
load_conf["index_files"] = index_files;
vec_index = dynamic_cast(new_index.get());
- load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
if (!is_sparse) {
EXPECT_EQ(vec_index->GetDim(), DIM);
@@ -734,7 +731,6 @@ TEST(Indexing, SearchDiskAnnWithInvalidParam) {
auto vec_index = dynamic_cast(new_index.get());
auto load_conf = generate_load_conf(index_type, metric_type, NB);
load_conf["index_files"] = index_files;
- load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
@@ -819,7 +815,6 @@ TEST(Indexing, SearchDiskAnnWithFloat16) {
auto vec_index = dynamic_cast(new_index.get());
auto load_conf = generate_load_conf(index_type, metric_type, NB);
load_conf["index_files"] = index_files;
- load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
@@ -903,7 +898,6 @@ TEST(Indexing, SearchDiskAnnWithBFloat16) {
auto vec_index = dynamic_cast(new_index.get());
auto load_conf = generate_load_conf(index_type, metric_type, NB);
load_conf["index_files"] = index_files;
- load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
diff --git a/internal/core/unittest/test_inverted_index.cpp b/internal/core/unittest/test_inverted_index.cpp
index 0715be3f54..8da7f2c078 100644
--- a/internal/core/unittest/test_inverted_index.cpp
+++ b/internal/core/unittest/test_inverted_index.cpp
@@ -208,7 +208,7 @@ test_run() {
Config config;
config["index_files"] = index_files;
- config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
+
ctx.set_for_loading_index(true);
auto index =
index::IndexFactory::GetInstance().CreateIndex(index_info, ctx);
@@ -483,7 +483,7 @@ test_string() {
Config config;
config["index_files"] = index_files;
- config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
+
ctx.set_for_loading_index(true);
auto index =
index::IndexFactory::GetInstance().CreateIndex(index_info, ctx);
diff --git a/internal/core/unittest/test_json_key_stats_index.cpp b/internal/core/unittest/test_json_key_stats_index.cpp
index 99470e5901..30f9b18ea9 100644
--- a/internal/core/unittest/test_json_key_stats_index.cpp
+++ b/internal/core/unittest/test_json_key_stats_index.cpp
@@ -126,7 +126,6 @@ class JsonKeyStatsIndexTest : public ::testing::TestWithParam {
index::CreateIndexInfo index_info{};
config["index_files"] = index_files;
- config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
index_ = std::make_shared(ctx, true);
index_->Load(milvus::tracer::TraceContext{}, config);
diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h
index 12b4d3ea0c..1282f08c28 100644
--- a/internal/core/unittest/test_utils/DataGen.h
+++ b/internal/core/unittest/test_utils/DataGen.h
@@ -1303,7 +1303,6 @@ GenVecIndexing(int64_t N,
auto create_index_result = indexing->Upload();
auto index_files = create_index_result->GetIndexFiles();
conf["index_files"] = index_files;
- conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH;
// we need a load stage to use index as the producation does
// knowhere would do some data preparation in this stage
indexing->Load(milvus::tracer::TraceContext{}, conf);
@@ -1440,4 +1439,4 @@ NewCollection(const milvus::proto::schema::CollectionSchema* schema,
return (void*)collection.release();
}
-} // namespace milvus::segcore
\ No newline at end of file
+} // namespace milvus::segcore
diff --git a/internal/core/unittest/test_utils/storage_test_utils.h b/internal/core/unittest/test_utils/storage_test_utils.h
index 541388e594..baa60c7712 100644
--- a/internal/core/unittest/test_utils/storage_test_utils.h
+++ b/internal/core/unittest/test_utils/storage_test_utils.h
@@ -124,7 +124,7 @@ PutFieldData(milvus::storage::ChunkManager* remote_chunk_manager,
FieldDataMeta& field_data_meta,
milvus::FieldMeta& field_meta) {
auto& pool =
- milvus::ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
+ milvus::ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
std::vector>> futures;
AssertInfo(buffers.size() == element_counts.size(),
"inconsistent size of data slices with slice sizes!");
diff --git a/internal/datacoord/stats_task_meta.go b/internal/datacoord/stats_task_meta.go
index c376b39e1d..11be93105c 100644
--- a/internal/datacoord/stats_task_meta.go
+++ b/internal/datacoord/stats_task_meta.go
@@ -19,10 +19,9 @@ package datacoord
import (
"context"
"fmt"
- "strconv"
-
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
+ "strconv"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/v2/log"
diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go
index a88dd88e36..6b2a7f0643 100644
--- a/internal/indexnode/indexnode.go
+++ b/internal/indexnode/indexnode.go
@@ -178,6 +178,8 @@ func (i *IndexNode) initSegcore() {
// set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64())
C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
+ cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64())
+ C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64())
C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go
index b51938652a..80d0f67d45 100644
--- a/internal/querycoordv2/balance/balance.go
+++ b/internal/querycoordv2/balance/balance.go
@@ -38,7 +38,6 @@ type SegmentAssignPlan struct {
FromScore int64
ToScore int64
SegmentScore int64
- Recovering bool
}
func (segPlan *SegmentAssignPlan) String() string {
diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go
index 0dfb21c35e..18b9fd2565 100644
--- a/internal/querycoordv2/balance/utils.go
+++ b/internal/querycoordv2/balance/utils.go
@@ -52,7 +52,6 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou
source,
p.Segment.GetCollectionID(),
p.Replica,
- p.Recovering,
actions...,
)
if err != nil {
@@ -74,7 +73,6 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou
zap.Int64("replica", p.Replica.GetID()),
zap.String("channel", p.Segment.GetInsertChannel()),
zap.String("level", p.Segment.GetLevel().String()),
- zap.Bool("recovering", p.Recovering),
zap.Int64("from", p.From),
zap.Int64("to", p.To))
if task.GetTaskType(t) == task.TaskTypeMove {
diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go
index 44bf275d40..7d97c7492d 100644
--- a/internal/querycoordv2/checkers/index_checker.go
+++ b/internal/querycoordv2/checkers/index_checker.go
@@ -224,7 +224,6 @@ func (c *IndexChecker) createSegmentUpdateTask(ctx context.Context, segment *met
c.ID(),
segment.GetCollectionID(),
replica,
- false,
action,
)
if err != nil {
@@ -280,7 +279,6 @@ func (c *IndexChecker) createSegmentStatsUpdateTask(ctx context.Context, segment
c.ID(),
segment.GetCollectionID(),
replica,
- false,
action,
)
if err != nil {
diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go
index 3aeb74b73b..50e638dd35 100644
--- a/internal/querycoordv2/checkers/segment_checker.go
+++ b/internal/querycoordv2/checkers/segment_checker.go
@@ -92,11 +92,7 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
if c.readyToCheck(ctx, cid) {
replicas := c.meta.ReplicaManager.GetByCollection(ctx, cid)
for _, r := range replicas {
- replicaTasks, finishRecover := c.checkReplica(ctx, r)
- results = append(results, replicaTasks...)
- if r.IsRecovering() && finishRecover {
- c.meta.ReplicaManager.FinishRecoverReplica(r.GetID())
- }
+ results = append(results, c.checkReplica(ctx, r)...)
}
}
}
@@ -128,13 +124,13 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
return results
}
-func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica) ([]task.Task, bool) {
+func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica) []task.Task {
ret := make([]task.Task, 0)
// compare with targets to find the lack and redundancy of segments
- lacks, recovering, redundancies, finishRecover := c.getSealedSegmentDiff(ctx, replica.GetCollectionID(), replica)
+ lacks, redundancies := c.getSealedSegmentDiff(ctx, replica.GetCollectionID(), replica.GetID())
// loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan)
- tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, recovering, replica)
+ tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica)
task.SetReason("lacks of segment", tasks...)
task.SetPriority(task.TaskPriorityNormal, tasks...)
ret = append(ret, tasks...)
@@ -161,7 +157,7 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
task.SetPriority(task.TaskPriorityNormal, tasks...)
ret = append(ret, tasks...)
- return ret, finishRecover
+ return ret
}
// GetGrowingSegmentDiff get streaming segment diff between leader view and target
@@ -233,8 +229,9 @@ func (c *SegmentChecker) getGrowingSegmentDiff(ctx context.Context, collectionID
func (c *SegmentChecker) getSealedSegmentDiff(
ctx context.Context,
collectionID int64,
- replica *meta.Replica,
-) (toLoad []*datapb.SegmentInfo, recovering []bool, toRelease []*meta.Segment, finishRecover bool) {
+ replicaID int64,
+) (toLoad []*datapb.SegmentInfo, toRelease []*meta.Segment) {
+ replica := c.meta.Get(ctx, replicaID)
if replica == nil {
log.Info("replica does not exist, skip it")
return
@@ -285,31 +282,26 @@ func (c *SegmentChecker) getSealedSegmentDiff(
nextTargetExist := c.targetMgr.IsNextTargetExist(ctx, collectionID)
nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, meta.NextTarget)
currentTargetMap := c.targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, meta.CurrentTarget)
- finishRecover = currentTargetMap != nil || nextTargetMap != nil
- // l0 Segment which exist on current target, but not on dist
- for _, segment := range currentTargetMap {
- if isSegmentLack(segment) {
- toLoad = append(toLoad, segment)
- recovering = append(recovering, true)
- finishRecover = false
- // for segments lacked due to node down, we need to load it under recovering mode
- }
- }
// Segment which exist on next target, but not on dist
for _, segment := range nextTargetMap {
- // to avoid generate duplicate segment task
- if currentTargetMap[segment.ID] != nil {
- continue
- }
if isSegmentLack(segment) {
toLoad = append(toLoad, segment)
- recovering = append(recovering, replica.IsRecovering())
- finishRecover = finishRecover && !replica.IsRecovering()
- // for segments lacked due to normal target advance, we load them under normal mode
- // for segments lacked due to whole instance restart, we load them under recover mode
}
}
+
+ // l0 Segment which exist on current target, but not on dist
+ for _, segment := range currentTargetMap {
+ // to avoid generate duplicate segment task
+ if nextTargetMap[segment.ID] != nil {
+ continue
+ }
+
+ if isSegmentLack(segment) {
+ toLoad = append(toLoad, segment)
+ }
+ }
+
// get segment which exist on dist, but not on current target and next target
for _, segment := range dist {
_, existOnCurrent := currentTargetMap[segment.GetID()]
@@ -329,9 +321,8 @@ func (c *SegmentChecker) getSealedSegmentDiff(
// to make sure all L0 delta logs will be delivered to the other segments.
if len(level0Segments) > 0 {
toLoad = level0Segments
- recovering = nil
- // for any l0 segments, we try to load them as soon as possible
}
+
return
}
@@ -418,26 +409,10 @@ func (c *SegmentChecker) filterSegmentInUse(ctx context.Context, replica *meta.R
return filtered
}
-func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []*datapb.SegmentInfo, recovering []bool, replica *meta.Replica) []task.Task {
+func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []*datapb.SegmentInfo, replica *meta.Replica) []task.Task {
if len(segments) == 0 {
return nil
}
- // set up recover map
- if recovering != nil && len(recovering) != len(segments) {
- // this branch should never be reached
- log.Warn("Recovering slice has different size with segment size, this should never happen",
- zap.Int("recover_len", len(recovering)),
- zap.Int("segments_len", len(segments)))
- return nil
- }
- recoverMap := make(map[int64]bool, len(segments))
- for i, segment := range segments {
- if recovering == nil {
- recoverMap[segment.GetID()] = true
- } else {
- recoverMap[segment.GetID()] = recovering[i]
- }
- }
isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0
shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string {
@@ -470,7 +445,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
shardPlans := c.getBalancerFunc().AssignSegment(ctx, replica.GetCollectionID(), segmentInfos, rwNodes, true)
for i := range shardPlans {
shardPlans[i].Replica = replica
- shardPlans[i].Recovering = recoverMap[shardPlans[i].Segment.GetID()]
}
plans = append(plans, shardPlans...)
}
@@ -488,7 +462,6 @@ func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments
c.ID(),
s.GetCollectionID(),
replica,
- false,
action,
)
if err != nil {
diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go
index 694ba38492..a94a0e5727 100644
--- a/internal/querycoordv2/checkers/segment_checker_test.go
+++ b/internal/querycoordv2/checkers/segment_checker_test.go
@@ -36,10 +36,8 @@ import (
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
- "github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
- "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type SegmentCheckerTestSuite struct {
@@ -172,146 +170,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
suite.Len(tasks, 1)
}
-func (suite *SegmentCheckerTestSuite) TestRecoverReplica() {
- ctx := context.Background()
- checker := suite.checker
- // set meta
- checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1))
- checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1))
- // set up a recovering replica
- nodes := []int64{1, 2}
- replica := meta.NewRecoveringReplica(
- &querypb.Replica{
- ID: 1,
- CollectionID: 1,
- Nodes: nodes,
- ResourceGroup: meta.DefaultResourceGroupName,
- },
- typeutil.NewUniqueSet(nodes...),
- )
- checker.meta.ReplicaManager.Put(ctx, replica)
- // set up nodes
- suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
- NodeID: 1,
- Address: "localhost",
- Hostname: "localhost",
- }))
- suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
- NodeID: 2,
- Address: "localhost",
- Hostname: "localhost",
- }))
- checker.meta.ResourceManager.HandleNodeUp(ctx, 1)
- checker.meta.ResourceManager.HandleNodeUp(ctx, 2)
-
- // set target
- segments := []*datapb.SegmentInfo{
- {
- ID: 1,
- PartitionID: 1,
- InsertChannel: "test-insert-channel",
- },
- }
-
- channels := []*datapb.VchannelInfo{
- {
- CollectionID: 1,
- ChannelName: "test-insert-channel",
- },
- }
-
- suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
- channels, segments, nil)
- checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1))
-
- // set dist
- checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
- checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
-
- tasks := checker.Check(context.TODO())
- suite.Len(tasks, 1)
- suite.Len(tasks[0].Actions(), 1)
- action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
- suite.True(ok)
- suite.EqualValues(1, tasks[0].ReplicaID())
- suite.Equal(task.ActionTypeGrow, action.Type())
- suite.EqualValues(1, action.GetSegmentID())
- suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
- // generated task should be run under recovering mode
- // for recovering replica, recovering segment is added based on next target
- suite.True(tasks[0].(*task.SegmentTask).Recovering())
-
- // update dist for loading finished
- segmentsInfos := make([]*meta.Segment, 0)
- segmentsInfos = append(segmentsInfos, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
- checker.dist.SegmentDistManager.Update(2, segmentsInfos...)
- tasks = checker.Check(context.TODO())
- suite.Len(tasks, 0)
- newReplica := checker.meta.ReplicaManager.Get(context.TODO(), 1)
- suite.False(newReplica.IsRecovering()) // Recover should be finished
-}
-
-func (suite *SegmentCheckerTestSuite) TestRecoverQueryNodes() {
- ctx := context.Background()
- checker := suite.checker
- // set meta
- checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1))
- checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1))
- // set up a normal running replica, not recovering
- checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(1, 1, []int64{1, 2}))
- // set up nodes
- suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
- NodeID: 1,
- Address: "localhost",
- Hostname: "localhost",
- }))
- suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
- NodeID: 2,
- Address: "localhost",
- Hostname: "localhost",
- }))
- checker.meta.ResourceManager.HandleNodeUp(ctx, 1)
- checker.meta.ResourceManager.HandleNodeUp(ctx, 2)
-
- // set target
- segments := []*datapb.SegmentInfo{
- {
- ID: 1,
- PartitionID: 1,
- InsertChannel: "test-insert-channel",
- },
- }
-
- channels := []*datapb.VchannelInfo{
- {
- CollectionID: 1,
- ChannelName: "test-insert-channel",
- },
- }
-
- suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
- channels, segments, nil)
- checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1))
- checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1))
-
- // set dist
- checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
- checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
-
- tasks := checker.Check(context.TODO())
- suite.Len(tasks, 1)
- suite.Len(tasks[0].Actions(), 1)
- action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
- suite.True(ok)
- suite.EqualValues(1, tasks[0].ReplicaID())
- suite.Equal(task.ActionTypeGrow, action.Type())
- suite.EqualValues(1, action.GetSegmentID())
- suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
- // generated task should also be run under recovering mode
- // for normal replica, recovering segment is added based on current target
- suite.True(tasks[0].(*task.SegmentTask).Recovering())
-}
-
func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
ctx := context.Background()
checker := suite.checker
@@ -370,7 +228,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
suite.EqualValues(1, action.GetSegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
- suite.True(tasks[0].(*task.SegmentTask).Recovering())
checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1))
// test load l0 segments in current target
@@ -384,7 +241,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
suite.EqualValues(1, action.GetSegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
- suite.True(tasks[0].(*task.SegmentTask).Recovering())
// seg l0 segment exist on a non delegator node
checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1))
@@ -400,102 +256,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
suite.EqualValues(1, action.GetSegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
- suite.True(tasks[0].(*task.SegmentTask).Recovering())
-}
-
-func (suite *SegmentCheckerTestSuite) TestRecoverL0Segments() {
- ctx := context.Background()
- checker := suite.checker
- // set meta
- checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1))
- checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1))
- nodes := []int64{1, 2}
- replica := meta.NewRecoveringReplica(
- &querypb.Replica{
- ID: 1,
- CollectionID: 1,
- Nodes: nodes,
- ResourceGroup: meta.DefaultResourceGroupName,
- },
- typeutil.NewUniqueSet(nodes...),
- )
- checker.meta.ReplicaManager.Put(ctx, replica)
- suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
- NodeID: 1,
- Address: "localhost",
- Hostname: "localhost",
- Version: common.Version,
- }))
- suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
- NodeID: 2,
- Address: "localhost",
- Hostname: "localhost",
- Version: common.Version,
- }))
- checker.meta.ResourceManager.HandleNodeUp(ctx, 1)
- checker.meta.ResourceManager.HandleNodeUp(ctx, 2)
-
- // set target
- segments := []*datapb.SegmentInfo{
- {
- ID: 1,
- PartitionID: 1,
- InsertChannel: "test-insert-channel",
- Level: datapb.SegmentLevel_L0,
- },
- {
- ID: 2,
- PartitionID: 1,
- InsertChannel: "test-insert-channel",
- Level: datapb.SegmentLevel_L1,
- },
- }
-
- channels := []*datapb.VchannelInfo{
- {
- CollectionID: 1,
- ChannelName: "test-insert-channel",
- },
- }
-
- suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
- channels, segments, nil)
- checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1))
-
- // set dist
- checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
- checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}))
-
- // test load l0 segments in next target
- tasks := checker.Check(context.TODO())
- suite.Len(tasks, 1)
- suite.Len(tasks[0].Actions(), 1)
- action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
- suite.True(ok)
- suite.EqualValues(1, tasks[0].ReplicaID())
- suite.Equal(task.ActionTypeGrow, action.Type())
- suite.EqualValues(1, action.GetSegmentID())
- suite.EqualValues(2, action.Node())
- suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
- suite.True(tasks[0].(*task.SegmentTask).Recovering())
-
- checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1))
- // test load l0 segments in current target
- tasks = checker.Check(context.TODO())
- suite.Len(tasks, 1)
- suite.Len(tasks[0].Actions(), 1)
- action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
- suite.True(ok)
- suite.EqualValues(1, tasks[0].ReplicaID())
- suite.Equal(task.ActionTypeGrow, action.Type())
- suite.EqualValues(1, action.GetSegmentID())
- suite.EqualValues(2, action.Node())
- suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
- suite.True(tasks[0].(*task.SegmentTask).Recovering())
-
- wrongRecovering := make([]bool, 1)
- wrongTasks := checker.createSegmentLoadTasks(ctx, segments, wrongRecovering, replica)
- suite.Nil(wrongTasks)
}
func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go
index 062dddf972..e13eb273eb 100644
--- a/internal/querycoordv2/handlers.go
+++ b/internal/querycoordv2/handlers.go
@@ -132,7 +132,6 @@ func (s *Server) balanceSegments(ctx context.Context,
utils.ManualBalance,
collectionID,
plan.Replica,
- false,
actions...,
)
if err != nil {
diff --git a/internal/querycoordv2/meta/replica.go b/internal/querycoordv2/meta/replica.go
index fed25c7247..bee254aebd 100644
--- a/internal/querycoordv2/meta/replica.go
+++ b/internal/querycoordv2/meta/replica.go
@@ -25,7 +25,6 @@ type Replica struct {
// always keep consistent with replicaPB.RoNodes.
// node used by replica but cannot add more channel or segment ont it.
// include rebalance node or node out of resource group.
- recovering bool
}
// Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead.
@@ -39,17 +38,6 @@ func NewReplica(replica *querypb.Replica, nodes ...typeutil.UniqueSet) *Replica
return newReplica(r)
}
-// Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead.
-func NewRecoveringReplica(replica *querypb.Replica, nodes ...typeutil.UniqueSet) *Replica {
- r := proto.Clone(replica).(*querypb.Replica)
- // TODO: nodes is a bad parameter, break the consistency, should be removed in future.
- // keep it for old unittest.
- if len(nodes) > 0 && len(replica.Nodes) == 0 && nodes[0].Len() > 0 {
- r.Nodes = nodes[0].Collect()
- }
- return newRecoveringReplica(r)
-}
-
// newReplica creates a new replica from pb.
func newReplica(replica *querypb.Replica) *Replica {
return &Replica{
@@ -59,19 +47,6 @@ func newReplica(replica *querypb.Replica) *Replica {
}
}
-func newRecoveringReplica(replica *querypb.Replica) *Replica {
- return &Replica{
- replicaPB: proto.Clone(replica).(*querypb.Replica),
- rwNodes: typeutil.NewUniqueSet(replica.Nodes...),
- roNodes: typeutil.NewUniqueSet(replica.RoNodes...),
- recovering: true,
- }
-}
-
-func (replica *Replica) IsRecovering() bool {
- return replica.recovering
-}
-
// GetID returns the id of the replica.
func (replica *Replica) GetID() typeutil.UniqueID {
return replica.replicaPB.GetID()
@@ -176,10 +151,9 @@ func (replica *Replica) CopyForWrite() *mutableReplica {
return &mutableReplica{
Replica: &Replica{
- replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica),
- rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...),
- roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...),
- recovering: replica.IsRecovering(),
+ replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica),
+ rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...),
+ roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...),
},
exclusiveRWNodeToChannel: exclusiveRWNodeToChannel,
}
diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go
index f7ba77449c..fdd867aa30 100644
--- a/internal/querycoordv2/meta/replica_manager.go
+++ b/internal/querycoordv2/meta/replica_manager.go
@@ -94,8 +94,7 @@ func (m *ReplicaManager) Recover(ctx context.Context, collections []int64) error
}
if collectionSet.Contain(replica.GetCollectionID()) {
- rep := newRecoveringReplica(replica)
- m.putReplicaInMemory(rep)
+ m.putReplicaInMemory(newReplica(replica))
log.Info("recover replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
@@ -298,15 +297,6 @@ func (m *ReplicaManager) RemoveCollection(ctx context.Context, collectionID type
return nil
}
-func (m *ReplicaManager) FinishRecoverReplica(replicaId typeutil.UniqueID) {
- m.rwmutex.Lock()
- defer m.rwmutex.Unlock()
- rep := m.replicas[replicaId]
- mutableRep := rep.CopyForWrite()
- mutableRep.recovering = false
- m.putReplicaInMemory(mutableRep.IntoReplica())
-}
-
func (m *ReplicaManager) RemoveReplicas(ctx context.Context, collectionID typeutil.UniqueID, replicas ...typeutil.UniqueID) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go
index 030fb11552..faca7776fb 100644
--- a/internal/querycoordv2/task/executor.go
+++ b/internal/querycoordv2/task/executor.go
@@ -196,7 +196,6 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
if err != nil {
return err
}
- loadInfo.Recovering = task.Recovering()
req := packLoadSegmentRequest(
task,
diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go
index e02d59d908..6fcbfb3eba 100644
--- a/internal/querycoordv2/task/task.go
+++ b/internal/querycoordv2/task/task.go
@@ -324,8 +324,7 @@ func (task *baseTask) Name() string {
type SegmentTask struct {
*baseTask
- segmentID typeutil.UniqueID
- recovering bool
+ segmentID typeutil.UniqueID
}
// NewSegmentTask creates a SegmentTask with actions,
@@ -336,7 +335,6 @@ func NewSegmentTask(ctx context.Context,
source Source,
collectionID typeutil.UniqueID,
replica *meta.Replica,
- recovering bool,
actions ...Action,
) (*SegmentTask, error) {
if len(actions) == 0 {
@@ -361,16 +359,11 @@ func NewSegmentTask(ctx context.Context,
base := newBaseTask(ctx, source, collectionID, replica, shard, fmt.Sprintf("SegmentTask-%s-%d", actions[0].Type().String(), segmentID))
base.actions = actions
return &SegmentTask{
- baseTask: base,
- segmentID: segmentID,
- recovering: recovering,
+ baseTask: base,
+ segmentID: segmentID,
}, nil
}
-func (task *SegmentTask) Recovering() bool {
- return task.recovering
-}
-
func (task *SegmentTask) SegmentID() typeutil.UniqueID {
return task.segmentID
}
@@ -384,7 +377,7 @@ func (task *SegmentTask) Name() string {
}
func (task *SegmentTask) String() string {
- return fmt.Sprintf("%s [segmentID=%d][recovering=%t]", task.baseTask.String(), task.segmentID, task.recovering)
+ return fmt.Sprintf("%s [segmentID=%d]", task.baseTask.String(), task.segmentID)
}
func (task *SegmentTask) MarshalJSON() ([]byte, error) {
diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go
index 6bab9f7b3d..f2a830fcb3 100644
--- a/internal/querycoordv2/task/task_test.go
+++ b/internal/querycoordv2/task/task_test.go
@@ -459,7 +459,6 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
@@ -561,7 +560,6 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
@@ -656,7 +654,6 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
@@ -722,7 +719,6 @@ func (suite *TaskSuite) TestReleaseSegmentTask() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeReduce, channel.GetChannelName(), segment),
)
suite.NoError(err)
@@ -767,7 +763,6 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming, 0),
)
suite.NoError(err)
@@ -864,7 +859,6 @@ func (suite *TaskSuite) TestMoveSegmentTask() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
NewSegmentAction(sourceNode, ActionTypeReduce, channel.GetChannelName(), segment),
)
@@ -949,7 +943,6 @@ func (suite *TaskSuite) TestMoveSegmentTaskStale() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
NewSegmentAction(sourceNode, ActionTypeReduce, channel.GetChannelName(), segment),
)
@@ -1028,7 +1021,6 @@ func (suite *TaskSuite) TestTaskCanceled() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
@@ -1114,7 +1106,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
@@ -1335,19 +1326,19 @@ func (suite *TaskSuite) TestCreateTaskBehavior() {
suite.ErrorIs(err, merr.ErrParameterInvalid)
suite.Nil(chanelTask)
- segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, false)
+ segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica)
suite.ErrorIs(err, merr.ErrParameterInvalid)
suite.Nil(segmentTask)
channelAction := NewChannelAction(0, 0, "fake-channel1")
- segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, false, channelAction)
+ segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, channelAction)
suite.ErrorIs(err, merr.ErrParameterInvalid)
suite.Nil(segmentTask)
segmentAction1 := NewSegmentAction(0, 0, "", 0)
segmentAction2 := NewSegmentAction(0, 0, "", 1)
- segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, false, segmentAction1, segmentAction2)
+ segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, segmentAction1, segmentAction2)
suite.ErrorIs(err, merr.ErrParameterInvalid)
suite.Nil(segmentTask)
@@ -1368,7 +1359,6 @@ func (suite *TaskSuite) TestSegmentTaskReplace() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, "", segment),
)
suite.NoError(err)
@@ -1386,7 +1376,6 @@ func (suite *TaskSuite) TestSegmentTaskReplace() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, "", segment),
)
suite.NoError(err)
@@ -1406,7 +1395,6 @@ func (suite *TaskSuite) TestSegmentTaskReplace() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, "", segment),
)
suite.NoError(err)
@@ -1447,7 +1435,6 @@ func (suite *TaskSuite) TestNoExecutor() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
@@ -1802,7 +1789,6 @@ func (suite *TaskSuite) TestGetTasksJSON() {
WrapIDSource(0),
suite.collection,
suite.replica,
- false,
NewSegmentAction(1, ActionTypeGrow, "", 1),
)
suite.NoError(err)
@@ -1844,7 +1830,6 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
WrapIDSource(0),
coll,
suite.replica,
- false,
NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical, 100),
)
task1.SetID(1)
@@ -1874,7 +1859,6 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
WrapIDSource(0),
coll2,
suite.replica,
- false,
NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical, 100),
)
suite.NoError(err)
@@ -1994,7 +1978,6 @@ func (suite *TaskSuite) TestRemoveTaskWithError() {
WrapIDSource(0),
coll,
suite.replica,
- false,
NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", 1, querypb.DataScope_Historical, 100),
)
suite.NoError(err)
diff --git a/internal/querycoordv2/task/utils_test.go b/internal/querycoordv2/task/utils_test.go
index 8d0cfdbaa8..56869c26df 100644
--- a/internal/querycoordv2/task/utils_test.go
+++ b/internal/querycoordv2/task/utils_test.go
@@ -44,7 +44,6 @@ func (s *UtilsSuite) TestPackLoadSegmentRequest() {
nil,
1,
newReplicaDefaultRG(10),
- false,
action,
)
s.NoError(err)
@@ -100,7 +99,6 @@ func (s *UtilsSuite) TestPackLoadSegmentRequestMmap() {
nil,
1,
newReplicaDefaultRG(10),
- false,
action,
)
s.NoError(err)
diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go
index 35089eb427..98b70aa979 100644
--- a/internal/querynodev2/segments/segment.go
+++ b/internal/querynodev2/segments/segment.go
@@ -822,8 +822,7 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
Field: field,
EnableMMap: mmapEnabled,
}},
- RowCount: rowCount,
- Recovering: s.loadInfo.Load().GetRecovering(),
+ RowCount: rowCount,
}
GetLoadPool().Submit(func() (any, error) {
@@ -952,7 +951,7 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
func GetCLoadInfoWithFunc(ctx context.Context,
fieldSchema *schemapb.FieldSchema,
- loadInfo *querypb.SegmentLoadInfo,
+ s *querypb.SegmentLoadInfo,
indexInfo *querypb.FieldIndexInfo,
f func(c *LoadIndexInfo) error,
) error {
@@ -986,9 +985,9 @@ func GetCLoadInfoWithFunc(ctx context.Context,
enableMmap := isIndexMmapEnable(fieldSchema, indexInfo)
indexInfoProto := &cgopb.LoadIndexInfo{
- CollectionID: loadInfo.GetCollectionID(),
- PartitionID: loadInfo.GetPartitionID(),
- SegmentID: loadInfo.GetSegmentID(),
+ CollectionID: s.GetCollectionID(),
+ PartitionID: s.GetPartitionID(),
+ SegmentID: s.GetSegmentID(),
Field: fieldSchema,
EnableMmap: enableMmap,
MmapDirPath: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(),
@@ -1000,7 +999,6 @@ func GetCLoadInfoWithFunc(ctx context.Context,
IndexEngineVersion: indexInfo.GetCurrentIndexVersion(),
IndexStoreVersion: indexInfo.GetIndexStoreVersion(),
IndexFileSize: indexInfo.GetIndexSize(),
- Recovering: loadInfo.GetRecovering(),
}
// 2.
@@ -1141,7 +1139,6 @@ func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextI
Schema: f,
CollectionID: s.Collection(),
PartitionID: s.Partition(),
- Recovering: s.LoadInfo().GetRecovering(),
}
marshaled, err := proto.Marshal(cgoProto)
@@ -1188,7 +1185,6 @@ func (s *LocalSegment) LoadJSONKeyIndex(ctx context.Context, jsonKeyStats *datap
Schema: f,
CollectionID: s.Collection(),
PartitionID: s.Partition(),
- Recovering: s.loadInfo.Load().GetRecovering(),
}
marshaled, err := proto.Marshal(cgoProto)
diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go
index 3533883631..5c7bcc06d1 100644
--- a/internal/querynodev2/segments/segment_loader.go
+++ b/internal/querynodev2/segments/segment_loader.go
@@ -879,8 +879,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context,
log.Info("start loading segment files",
zap.Int64("rowNum", loadInfo.GetNumOfRows()),
- zap.String("segmentType", segment.Type().String()),
- zap.Bool("isRecovering", loadInfo.GetRecovering()))
+ zap.String("segmentType", segment.Type().String()))
collection := loader.manager.Collection.Get(segment.Collection())
if collection == nil {
diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go
index 69c9c2d1f7..b5bfc16b54 100644
--- a/internal/querynodev2/server.go
+++ b/internal/querynodev2/server.go
@@ -214,6 +214,8 @@ func (node *QueryNode) InitSegcore() error {
// set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64())
C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
+ cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64())
+ C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64())
C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
diff --git a/internal/util/segcore/requests.go b/internal/util/segcore/requests.go
index 19218706fb..42eca2c6e1 100644
--- a/internal/util/segcore/requests.go
+++ b/internal/util/segcore/requests.go
@@ -34,10 +34,9 @@ type DeleteRequest struct {
}
type LoadFieldDataRequest struct {
- Fields []LoadFieldDataInfo
- MMapDir string
- RowCount int64
- Recovering bool
+ Fields []LoadFieldDataInfo
+ MMapDir string
+ RowCount int64
}
type LoadFieldDataInfo struct {
@@ -83,7 +82,6 @@ func (req *LoadFieldDataRequest) getCLoadFieldDataRequest() (result *cLoadFieldD
defer C.free(unsafe.Pointer(mmapDir))
C.AppendMMapDirPath(cLoadFieldDataInfo, mmapDir)
}
- C.SetRecovering(cLoadFieldDataInfo, C.bool(req.Recovering))
return &cLoadFieldDataRequest{
cLoadFieldDataInfo: cLoadFieldDataInfo,
}, nil
diff --git a/pkg/proto/cgo_msg.proto b/pkg/proto/cgo_msg.proto
index a5ba4e5d16..1ebc91dcbe 100644
--- a/pkg/proto/cgo_msg.proto
+++ b/pkg/proto/cgo_msg.proto
@@ -21,7 +21,6 @@ message LoadIndexInfo {
int64 index_store_version = 14;
int32 index_engine_version = 15;
int64 index_file_size = 16;
- bool recovering = 17;
}
message IndexStats {
diff --git a/pkg/proto/cgopb/cgo_msg.pb.go b/pkg/proto/cgopb/cgo_msg.pb.go
index 843c458631..90c297cc6b 100644
--- a/pkg/proto/cgopb/cgo_msg.pb.go
+++ b/pkg/proto/cgopb/cgo_msg.pb.go
@@ -41,7 +41,6 @@ type LoadIndexInfo struct {
IndexStoreVersion int64 `protobuf:"varint,14,opt,name=index_store_version,json=indexStoreVersion,proto3" json:"index_store_version,omitempty"`
IndexEngineVersion int32 `protobuf:"varint,15,opt,name=index_engine_version,json=indexEngineVersion,proto3" json:"index_engine_version,omitempty"`
IndexFileSize int64 `protobuf:"varint,16,opt,name=index_file_size,json=indexFileSize,proto3" json:"index_file_size,omitempty"`
- Recovering bool `protobuf:"varint,17,opt,name=recovering,proto3" json:"recovering,omitempty"`
}
func (x *LoadIndexInfo) Reset() {
@@ -181,13 +180,6 @@ func (x *LoadIndexInfo) GetIndexFileSize() int64 {
return 0
}
-func (x *LoadIndexInfo) GetRecovering() bool {
- if x != nil {
- return x.Recovering
- }
- return false
-}
-
type IndexStats struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -304,7 +296,7 @@ var file_cgo_msg_proto_rawDesc = []byte{
0x0a, 0x0d, 0x63, 0x67, 0x6f, 0x5f, 0x6d, 0x73, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x10, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x67,
0x6f, 0x1a, 0x0c, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22,
- 0xc6, 0x05, 0x0a, 0x0d, 0x4c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x49, 0x6e, 0x66,
+ 0xa6, 0x05, 0x0a, 0x0d, 0x4c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x49, 0x6e, 0x66,
0x6f, 0x12, 0x22, 0x0a, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49,
0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74,
0x69, 0x6f, 0x6e, 0x49, 0x44, 0x12, 0x20, 0x0a, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
@@ -342,9 +334,7 @@ var file_cgo_msg_proto_rawDesc = []byte{
0x64, 0x65, 0x78, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e,
0x12, 0x26, 0x0a, 0x0f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x73,
0x69, 0x7a, 0x65, 0x18, 0x10, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x69, 0x6e, 0x64, 0x65, 0x78,
- 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x72, 0x65, 0x63, 0x6f,
- 0x76, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x11, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x72, 0x65,
- 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x3e, 0x0a, 0x10, 0x49, 0x6e, 0x64, 0x65,
+ 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x1a, 0x3e, 0x0a, 0x10, 0x49, 0x6e, 0x64, 0x65,
0x78, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03,
0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14,
0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76,
diff --git a/pkg/proto/index_cgo_msg.proto b/pkg/proto/index_cgo_msg.proto
index ff4bee23b5..4a91c07f0c 100644
--- a/pkg/proto/index_cgo_msg.proto
+++ b/pkg/proto/index_cgo_msg.proto
@@ -93,7 +93,6 @@ message LoadTextIndexInfo {
schema.FieldSchema schema = 5;
int64 collectionID = 6;
int64 partitionID = 7;
- bool recovering = 8;
}
message LoadJsonKeyIndexInfo {
@@ -104,5 +103,4 @@ message LoadJsonKeyIndexInfo {
schema.FieldSchema schema = 5;
int64 collectionID = 6;
int64 partitionID = 7;
- bool recovering = 8;
}
diff --git a/pkg/proto/indexcgopb/index_cgo_msg.pb.go b/pkg/proto/indexcgopb/index_cgo_msg.pb.go
index 906d38e755..e9eb05c5d0 100644
--- a/pkg/proto/indexcgopb/index_cgo_msg.pb.go
+++ b/pkg/proto/indexcgopb/index_cgo_msg.pb.go
@@ -773,7 +773,6 @@ type LoadTextIndexInfo struct {
Schema *schemapb.FieldSchema `protobuf:"bytes,5,opt,name=schema,proto3" json:"schema,omitempty"`
CollectionID int64 `protobuf:"varint,6,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,7,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
- Recovering bool `protobuf:"varint,8,opt,name=recovering,proto3" json:"recovering,omitempty"`
}
func (x *LoadTextIndexInfo) Reset() {
@@ -857,13 +856,6 @@ func (x *LoadTextIndexInfo) GetPartitionID() int64 {
return 0
}
-func (x *LoadTextIndexInfo) GetRecovering() bool {
- if x != nil {
- return x.Recovering
- }
- return false
-}
-
type LoadJsonKeyIndexInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -876,7 +868,6 @@ type LoadJsonKeyIndexInfo struct {
Schema *schemapb.FieldSchema `protobuf:"bytes,5,opt,name=schema,proto3" json:"schema,omitempty"`
CollectionID int64 `protobuf:"varint,6,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,7,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
- Recovering bool `protobuf:"varint,8,opt,name=recovering,proto3" json:"recovering,omitempty"`
}
func (x *LoadJsonKeyIndexInfo) Reset() {
@@ -960,13 +951,6 @@ func (x *LoadJsonKeyIndexInfo) GetPartitionID() int64 {
return 0
}
-func (x *LoadJsonKeyIndexInfo) GetRecovering() bool {
- if x != nil {
- return x.Recovering
- }
- return false
-}
-
var File_index_cgo_msg_proto protoreflect.FileDescriptor
var file_index_cgo_msg_proto_rawDesc = []byte{
@@ -1111,7 +1095,7 @@ var file_index_cgo_msg_proto_rawDesc = []byte{
0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x5f, 0x74, 0x61, 0x6e, 0x74, 0x69,
0x76, 0x79, 0x5f, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x18, 0x16, 0x20, 0x01, 0x28, 0x03, 0x52,
0x19, 0x6a, 0x73, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x53, 0x74, 0x61, 0x74, 0x73, 0x54, 0x61, 0x6e,
- 0x74, 0x69, 0x76, 0x79, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x22, 0x97, 0x02, 0x0a, 0x11, 0x4c,
+ 0x74, 0x69, 0x76, 0x79, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x22, 0xf7, 0x01, 0x0a, 0x11, 0x4c,
0x6f, 0x61, 0x64, 0x54, 0x65, 0x78, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x49, 0x6e, 0x66, 0x6f,
0x12, 0x18, 0x0a, 0x07, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28,
0x03, 0x52, 0x07, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65,
@@ -1127,9 +1111,7 @@ var file_index_cgo_msg_proto_rawDesc = []byte{
0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e,
0x49, 0x44, 0x12, 0x20, 0x0a, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49,
0x44, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
- 0x6f, 0x6e, 0x49, 0x44, 0x12, 0x1e, 0x0a, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69,
- 0x6e, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65,
- 0x72, 0x69, 0x6e, 0x67, 0x22, 0x9a, 0x02, 0x0a, 0x14, 0x4c, 0x6f, 0x61, 0x64, 0x4a, 0x73, 0x6f,
+ 0x6f, 0x6e, 0x49, 0x44, 0x22, 0xfa, 0x01, 0x0a, 0x14, 0x4c, 0x6f, 0x61, 0x64, 0x4a, 0x73, 0x6f,
0x6e, 0x4b, 0x65, 0x79, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x18, 0x0a,
0x07, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07,
0x46, 0x69, 0x65, 0x6c, 0x64, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69,
@@ -1145,9 +1127,7 @@ var file_index_cgo_msg_proto_rawDesc = []byte{
0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x12,
0x20, 0x0a, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x18, 0x07,
0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49,
- 0x44, 0x12, 0x1e, 0x0a, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x18,
- 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e,
- 0x67, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
+ 0x44, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73,
0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x69, 0x6e,
0x64, 0x65, 0x78, 0x63, 0x67, 0x6f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
diff --git a/pkg/proto/query_coord.proto b/pkg/proto/query_coord.proto
index 47d6144a69..f0a2c546bc 100644
--- a/pkg/proto/query_coord.proto
+++ b/pkg/proto/query_coord.proto
@@ -375,7 +375,6 @@ message SegmentLoadInfo {
map textStatsLogs = 20;
repeated data.FieldBinlog bm25logs = 21;
map jsonKeyStatsLogs = 22;
- bool recovering = 23;
}
message FieldIndexInfo {
diff --git a/pkg/proto/querypb/query_coord.pb.go b/pkg/proto/querypb/query_coord.pb.go
index f343ea3051..49e2e37830 100644
--- a/pkg/proto/querypb/query_coord.pb.go
+++ b/pkg/proto/querypb/query_coord.pb.go
@@ -2049,7 +2049,6 @@ type SegmentLoadInfo struct {
TextStatsLogs map[int64]*datapb.TextIndexStats `protobuf:"bytes,20,rep,name=textStatsLogs,proto3" json:"textStatsLogs,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Bm25Logs []*datapb.FieldBinlog `protobuf:"bytes,21,rep,name=bm25logs,proto3" json:"bm25logs,omitempty"`
JsonKeyStatsLogs map[int64]*datapb.JsonKeyStats `protobuf:"bytes,22,rep,name=jsonKeyStatsLogs,proto3" json:"jsonKeyStatsLogs,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
- Recovering bool `protobuf:"varint,23,opt,name=recovering,proto3" json:"recovering,omitempty"`
}
func (x *SegmentLoadInfo) Reset() {
@@ -2239,13 +2238,6 @@ func (x *SegmentLoadInfo) GetJsonKeyStatsLogs() map[int64]*datapb.JsonKeyStats {
return nil
}
-func (x *SegmentLoadInfo) GetRecovering() bool {
- if x != nil {
- return x.Recovering
- }
- return false
-}
-
type FieldIndexInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -7541,7 +7533,7 @@ var file_query_coord_proto_rawDesc = []byte{
0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63,
0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65,
0x6c, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x68,
- 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xe0, 0x0a, 0x0a, 0x0f, 0x53, 0x65,
+ 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xc0, 0x0a, 0x0a, 0x0f, 0x53, 0x65,
0x67, 0x6d, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x0a,
0x09, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03,
0x52, 0x09, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x12, 0x20, 0x0a, 0x0b, 0x70,
@@ -7613,9 +7605,7 @@ var file_query_coord_proto_rawDesc = []byte{
0x74, 0x4c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x4a, 0x73, 0x6f, 0x6e, 0x4b, 0x65,
0x79, 0x53, 0x74, 0x61, 0x74, 0x73, 0x4c, 0x6f, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52,
0x10, 0x6a, 0x73, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x53, 0x74, 0x61, 0x74, 0x73, 0x4c, 0x6f, 0x67,
- 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x18,
- 0x17, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e,
- 0x67, 0x1a, 0x63, 0x0a, 0x12, 0x54, 0x65, 0x78, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x4c, 0x6f,
+ 0x73, 0x1a, 0x63, 0x0a, 0x12, 0x54, 0x65, 0x78, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x4c, 0x6f,
0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x37, 0x0a, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x6d, 0x69, 0x6c, 0x76, 0x75,