diff --git a/internal/core/src/clustering/file_utils.h b/internal/core/src/clustering/file_utils.h index f37d86d79a..f5e8b966c7 100644 --- a/internal/core/src/clustering/file_utils.h +++ b/internal/core/src/clustering/file_utils.h @@ -39,4 +39,30 @@ AddClusteringResultFiles(milvus::storage::ChunkManager* remote_chunk_manager, map[remote_prefix] = data_size; } +void +RemoveClusteringResultFiles( + milvus::storage::ChunkManager* remote_chunk_manager, + const std::unordered_map& map) { + auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); + std::vector> futures; + + for (auto& [file_path, file_size] : map) { + futures.push_back(pool.Submit( + [&, path = file_path]() { remote_chunk_manager->Remove(path); })); + } + std::exception_ptr first_exception = nullptr; + for (auto& future : futures) { + try { + future.get(); + } catch (...) { + if (!first_exception) { + first_exception = std::current_exception(); + } + } + } + if (first_exception) { + std::rethrow_exception(first_exception); + } +} + } // namespace milvus::clustering diff --git a/internal/core/src/common/Common.cpp b/internal/core/src/common/Common.cpp index 837bf7204c..2984868534 100644 --- a/internal/core/src/common/Common.cpp +++ b/internal/core/src/common/Common.cpp @@ -22,6 +22,8 @@ namespace milvus { int64_t FILE_SLICE_SIZE = DEFAULT_INDEX_FILE_SLICE_SIZE; int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; +int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = + DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT = DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT; int CPU_NUM = DEFAULT_CPU_NUM; @@ -44,6 +46,13 @@ SetHighPriorityThreadCoreCoefficient(const int64_t coefficient) { HIGH_PRIORITY_THREAD_CORE_COEFFICIENT); } +void +SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient) { + MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; + LOG_INFO("set middle priority thread pool core coefficient: {}", + MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT); +} + void SetLowPriorityThreadCoreCoefficient(const int64_t coefficient) { LOW_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h index 9cd9909c4b..3b7722fd0c 100644 --- a/internal/core/src/common/Common.h +++ b/internal/core/src/common/Common.h @@ -25,6 +25,7 @@ namespace milvus { extern int64_t FILE_SLICE_SIZE; extern int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; +extern int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT; extern int CPU_NUM; extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE; @@ -37,6 +38,9 @@ SetIndexSliceSize(const int64_t size); void SetHighPriorityThreadCoreCoefficient(const int64_t coefficient); +void +SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient); + void SetLowPriorityThreadCoreCoefficient(const int64_t coefficient); diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index edd0dde60a..3548e2d3ce 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -57,6 +57,7 @@ const char DEFAULT_TASK_ID[] = "0"; const int64_t DEFAULT_FIELD_MAX_MEMORY_LIMIT = 128 << 20; // bytes const int64_t DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10; +const int64_t DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5; const int64_t DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1; const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 16 << 20; // bytes diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h index 7dc2691456..00774b8162 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadInfo.h @@ -40,7 +40,6 @@ struct LoadFieldDataInfo { std::string mmap_dir_path = ""; std::string url; int64_t storage_version = 0; - bool recovering = false; }; struct LoadDeletedRecordInfo { diff --git a/internal/core/src/common/init_c.cpp b/internal/core/src/common/init_c.cpp index 1c9bcc3454..b0058e4e8e 100644 --- a/internal/core/src/common/init_c.cpp +++ b/internal/core/src/common/init_c.cpp @@ -45,6 +45,16 @@ InitHighPriorityThreadCoreCoefficient(const int64_t value) { value); } +void +InitMiddlePriorityThreadCoreCoefficient(const int64_t value) { + std::call_once( + flag4, + [](int64_t value) { + milvus::SetMiddlePriorityThreadCoreCoefficient(value); + }, + value); +} + void InitLowPriorityThreadCoreCoefficient(const int64_t value) { std::call_once( diff --git a/internal/core/src/common/init_c.h b/internal/core/src/common/init_c.h index 0e114646f2..1895667621 100644 --- a/internal/core/src/common/init_c.h +++ b/internal/core/src/common/init_c.h @@ -30,6 +30,9 @@ InitIndexSliceSize(const int64_t); void InitHighPriorityThreadCoreCoefficient(const int64_t); +void +InitMiddlePriorityThreadCoreCoefficient(const int64_t); + void InitLowPriorityThreadCoreCoefficient(const int64_t); diff --git a/internal/core/src/index/BitmapIndex.cpp b/internal/core/src/index/BitmapIndex.cpp index 5b8977c831..716faeb32e 100644 --- a/internal/core/src/index/BitmapIndex.cpp +++ b/internal/core/src/index/BitmapIndex.cpp @@ -569,13 +569,12 @@ BitmapIndex::LoadWithoutAssemble(const BinarySet& binary_set, template void BitmapIndex::Load(milvus::tracer::TraceContext ctx, const Config& config) { - LOG_INFO("load bitmap index with config {}", config.dump()); + LOG_DEBUG("load bitmap index with config {}", config.dump()); auto index_files = GetValueFromConfig>(config, "index_files"); AssertInfo(index_files.has_value(), "index file paths is empty when load bitmap index"); - auto index_datas = file_manager_->LoadIndexToMemory( - index_files.value(), config[milvus::THREAD_POOL_PRIORITY]); + auto index_datas = file_manager_->LoadIndexToMemory(index_files.value()); AssembleIndexDatas(index_datas); BinarySet binary_set; for (auto& [key, data] : index_datas) { diff --git a/internal/core/src/index/HybridScalarIndex.cpp b/internal/core/src/index/HybridScalarIndex.cpp index 4f2c61904b..304fea6706 100644 --- a/internal/core/src/index/HybridScalarIndex.cpp +++ b/internal/core/src/index/HybridScalarIndex.cpp @@ -361,8 +361,7 @@ HybridScalarIndex::Load(milvus::tracer::TraceContext ctx, auto index_type_file = GetRemoteIndexTypeFile(index_files.value()); auto index_datas = mem_file_manager_->LoadIndexToMemory( - std::vector{index_type_file}, - config[milvus::THREAD_POOL_PRIORITY]); + std::vector{index_type_file}); AssembleIndexDatas(index_datas); BinarySet binary_set; for (auto& [key, data] : index_datas) { diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index 7fe384c47e..caafa1b55a 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -214,16 +214,15 @@ InvertedIndexTantivy::Load(milvus::tracer::TraceContext ctx, } } - auto index_datas = mem_file_manager_->LoadIndexToMemory( - null_offset_files, config[milvus::THREAD_POOL_PRIORITY]); + auto index_datas = + mem_file_manager_->LoadIndexToMemory(null_offset_files); AssembleIndexDatas(index_datas); null_offset_data = index_datas.at(INDEX_NULL_OFFSET_FILE_NAME); } else if (auto it = find_file(INDEX_NULL_OFFSET_FILE_NAME); it != inverted_index_files.end()) { // null offset file is not sliced null_offset_files.push_back(*it); - auto index_datas = mem_file_manager_->LoadIndexToMemory( - {*it}, config[milvus::THREAD_POOL_PRIORITY]); + auto index_datas = mem_file_manager_->LoadIndexToMemory({*it}); null_offset_data = index_datas.at(INDEX_NULL_OFFSET_FILE_NAME); } @@ -245,8 +244,7 @@ InvertedIndexTantivy::Load(milvus::tracer::TraceContext ctx, file) != null_offset_files.end(); }), inverted_index_files.end()); - disk_file_manager_->CacheIndexToDisk(inverted_index_files, - config[milvus::THREAD_POOL_PRIORITY]); + disk_file_manager_->CacheIndexToDisk(inverted_index_files); path_ = prefix; wrapper_ = std::make_shared(prefix.c_str()); } diff --git a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp index 307456d299..a3bbd5cbb8 100644 --- a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp +++ b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp @@ -383,8 +383,7 @@ JsonKeyStatsInvertedIndex::Load(milvus::tracer::TraceContext ctx, index_file = remote_prefix + "/" + index_file; } } - disk_file_manager_->CacheJsonKeyIndexToDisk( - index_files.value(), config[milvus::THREAD_POOL_PRIORITY]); + disk_file_manager_->CacheJsonKeyIndexToDisk(index_files.value()); AssertInfo( tantivy_index_exist(path_.c_str()), "index not exist: {}", path_); wrapper_ = std::make_shared(path_.c_str()); diff --git a/internal/core/src/index/ScalarIndexSort.cpp b/internal/core/src/index/ScalarIndexSort.cpp index ff370fb8f9..1fd10d2118 100644 --- a/internal/core/src/index/ScalarIndexSort.cpp +++ b/internal/core/src/index/ScalarIndexSort.cpp @@ -206,8 +206,7 @@ ScalarIndexSort::Load(milvus::tracer::TraceContext ctx, GetValueFromConfig>(config, "index_files"); AssertInfo(index_files.has_value(), "index file paths is empty when load disk ann index"); - auto index_datas = file_manager_->LoadIndexToMemory( - index_files.value(), config[milvus::THREAD_POOL_PRIORITY]); + auto index_datas = file_manager_->LoadIndexToMemory(index_files.value()); AssembleIndexDatas(index_datas); BinarySet binary_set; for (auto& [key, data] : index_datas) { diff --git a/internal/core/src/index/StringIndexMarisa.cpp b/internal/core/src/index/StringIndexMarisa.cpp index d56864293b..3182a8bda5 100644 --- a/internal/core/src/index/StringIndexMarisa.cpp +++ b/internal/core/src/index/StringIndexMarisa.cpp @@ -233,8 +233,7 @@ StringIndexMarisa::Load(milvus::tracer::TraceContext ctx, GetValueFromConfig>(config, "index_files"); AssertInfo(index_files.has_value(), "index file paths is empty when load index"); - auto index_datas = file_manager_->LoadIndexToMemory( - index_files.value(), config[milvus::THREAD_POOL_PRIORITY]); + auto index_datas = file_manager_->LoadIndexToMemory(index_files.value()); AssembleIndexDatas(index_datas); BinarySet binary_set; for (auto& [key, data] : index_datas) { diff --git a/internal/core/src/index/TextMatchIndex.cpp b/internal/core/src/index/TextMatchIndex.cpp index 3db642548a..ee69d91044 100644 --- a/internal/core/src/index/TextMatchIndex.cpp +++ b/internal/core/src/index/TextMatchIndex.cpp @@ -136,8 +136,7 @@ TextMatchIndex::Load(const Config& config) { std::vector file; file.push_back(*it); files_value.erase(it); - auto index_datas = mem_file_manager_->LoadIndexToMemory( - file, config[milvus::THREAD_POOL_PRIORITY]); + auto index_datas = mem_file_manager_->LoadIndexToMemory(file); AssembleIndexDatas(index_datas); BinarySet binary_set; for (auto& [key, data] : index_datas) { @@ -154,8 +153,7 @@ TextMatchIndex::Load(const Config& config) { index_valid_data->data.get(), (size_t)index_valid_data->size); } - disk_file_manager_->CacheTextLogToDisk( - files_value, config[milvus::THREAD_POOL_PRIORITY]); + disk_file_manager_->CacheTextLogToDisk(files_value); AssertInfo( tantivy_index_exist(prefix.c_str()), "index not exist: {}", prefix); wrapper_ = std::make_shared(prefix.c_str()); diff --git a/internal/core/src/index/VectorDiskIndex.cpp b/internal/core/src/index/VectorDiskIndex.cpp index 51af03f849..ac0314420c 100644 --- a/internal/core/src/index/VectorDiskIndex.cpp +++ b/internal/core/src/index/VectorDiskIndex.cpp @@ -96,8 +96,7 @@ VectorDiskAnnIndex::Load(milvus::tracer::TraceContext ctx, GetValueFromConfig>(config, "index_files"); AssertInfo(index_files.has_value(), "index file paths is empty when load disk ann index data"); - file_manager_->CacheIndexToDisk(index_files.value(), - config[milvus::THREAD_POOL_PRIORITY]); + file_manager_->CacheIndexToDisk(index_files.value()); read_file_span->End(); } diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp index 2c635118d6..7c492f7b3a 100644 --- a/internal/core/src/index/VectorMemIndex.cpp +++ b/internal/core/src/index/VectorMemIndex.cpp @@ -210,8 +210,8 @@ VectorMemIndex::Load(milvus::tracer::TraceContext ctx, std::string index_file_prefix = slice_meta_filepath.substr( 0, slice_meta_filepath.find_last_of('/') + 1); - auto result = file_manager_->LoadIndexToMemory( - {slice_meta_filepath}, config[milvus::THREAD_POOL_PRIORITY]); + auto result = + file_manager_->LoadIndexToMemory({slice_meta_filepath}); auto raw_slice_meta = result[INDEX_FILE_SLICE_META]; Config meta_data = Config::parse( std::string(static_cast(raw_slice_meta->Data()), @@ -231,8 +231,7 @@ VectorMemIndex::Load(milvus::tracer::TraceContext ctx, batch.push_back(index_file_prefix + file_name); } - auto batch_data = file_manager_->LoadIndexToMemory( - batch, config[milvus::THREAD_POOL_PRIORITY]); + auto batch_data = file_manager_->LoadIndexToMemory(batch); for (const auto& file_path : batch) { const std::string file_name = file_path.substr(file_path.find_last_of('/') + 1); @@ -254,10 +253,9 @@ VectorMemIndex::Load(milvus::tracer::TraceContext ctx, } if (!pending_index_files.empty()) { - auto result = file_manager_->LoadIndexToMemory( - std::vector(pending_index_files.begin(), - pending_index_files.end()), - config[milvus::THREAD_POOL_PRIORITY]); + auto result = + file_manager_->LoadIndexToMemory(std::vector( + pending_index_files.begin(), pending_index_files.end())); for (auto&& index_data : result) { index_datas.insert(std::move(index_data)); } @@ -569,8 +567,7 @@ void VectorMemIndex::LoadFromFile(const Config& config) { std::vector batch{}; batch.reserve(parallel_degree); - auto result = file_manager_->LoadIndexToMemory( - {slice_meta_filepath}, config[milvus::THREAD_POOL_PRIORITY]); + auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath}); auto raw_slice_meta = result[INDEX_FILE_SLICE_META]; Config meta_data = Config::parse( std::string(static_cast(raw_slice_meta->Data()), @@ -582,8 +579,7 @@ void VectorMemIndex::LoadFromFile(const Config& config) { auto total_len = static_cast(item[TOTAL_LEN]); auto HandleBatch = [&](int index) { auto start_load2_mem = std::chrono::system_clock::now(); - auto batch_data = file_manager_->LoadIndexToMemory( - batch, config[milvus::THREAD_POOL_PRIORITY]); + auto batch_data = file_manager_->LoadIndexToMemory(batch); load_duration_sum += (std::chrono::system_clock::now() - start_load2_mem); for (int j = index - batch.size() + 1; j <= index; j++) { @@ -621,10 +617,8 @@ void VectorMemIndex::LoadFromFile(const Config& config) { } else { //1. load files into memory auto start_load_files2_mem = std::chrono::system_clock::now(); - auto result = file_manager_->LoadIndexToMemory( - std::vector(pending_index_files.begin(), - pending_index_files.end()), - config[milvus::THREAD_POOL_PRIORITY]); + auto result = file_manager_->LoadIndexToMemory(std::vector( + pending_index_files.begin(), pending_index_files.end())); load_duration_sum += (std::chrono::system_clock::now() - start_load_files2_mem); //2. write data into files diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index eaa127118c..b02810b405 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -243,17 +243,15 @@ ChunkedSegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { auto parallel_degree = static_cast( DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); field_data_info.arrow_reader_channel->set_capacity(parallel_degree * 2); - auto priority = load_info.recovering ? milvus::ThreadPoolPriority::HIGH - : milvus::ThreadPoolPriority::LOW; - LoadArrowReaderFromRemote( - insert_files, field_data_info.arrow_reader_channel, priority); + auto& pool = + ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); + pool.Submit(LoadArrowReaderFromRemote, + insert_files, + field_data_info.arrow_reader_channel); - LOG_INFO( - "segment {} submits load field {} task to thread pool, " - "recovering:{}", - this->get_segment_id(), - field_id.get(), - load_info.recovering); + LOG_INFO("segment {} submits load field {} task to thread pool", + this->get_segment_id(), + field_id.get()); bool use_mmap = false; if (!info.enable_mmap || SystemProperty::Instance().IsSystem(field_id)) { diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 08fda5d0a3..20a9939073 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -222,8 +222,6 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { AssertInfo(infos.field_infos.find(primary_field_id.get()) != infos.field_infos.end(), "primary field data should be included"); - auto priority = infos.recovering ? milvus::ThreadPoolPriority::HIGH - : milvus::ThreadPoolPriority::LOW; size_t num_rows = storage::GetNumRowsForLoadInfo(infos); auto reserved_offset = PreInsert(num_rows); @@ -236,31 +234,29 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { return std::stol(a.substr(a.find_last_of('/') + 1)) < std::stol(b.substr(b.find_last_of('/') + 1)); }); - LOG_INFO( - "segment {} loads field {} with num_rows {}, insert_files_count:{}", - this->get_segment_id(), - field_id.get(), - num_rows, - insert_files.size()); auto channel = std::make_shared(); - LoadFieldDatasFromRemote(insert_files, channel, priority); - auto loaded_field_datas_info = - storage::CollectFieldDataChannelWithInfos(channel); - LOG_INFO( - "segment {} loads field {} with num_rows {}, loaded_data_size:{}", - this->get_segment_id(), - field_id.get(), - num_rows, - loaded_field_datas_info.data_size_); - auto& loaded_field_datas = loaded_field_datas_info.loaded_field_datas_; + auto& pool = + ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); + + LOG_INFO("segment {} loads field {} with num_rows {}", + this->get_segment_id(), + field_id.get(), + num_rows); + auto load_future = + pool.Submit(LoadFieldDatasFromRemote, insert_files, channel); + + LOG_INFO("segment {} submits load field {} task to thread pool", + this->get_segment_id(), + field_id.get()); + auto field_data = storage::CollectFieldDataChannel(channel); if (field_id == TimestampFieldID) { // step 2: sort timestamp // query node already guarantees that the timestamp is ordered, avoid field data copy in c++ // step 3: fill into Segment.ConcurrentVector insert_record_.timestamps_.set_data_raw(reserved_offset, - loaded_field_datas); + field_data); continue; } @@ -271,14 +267,14 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { if (!indexing_record_.HasRawData(field_id)) { if (insert_record_.is_valid_data_exist(field_id)) { insert_record_.get_valid_data(field_id)->set_data_raw( - loaded_field_datas); + field_data); } insert_record_.get_data_base(field_id)->set_data_raw( - reserved_offset, loaded_field_datas); + reserved_offset, field_data); } if (segcore_config_.get_enable_interim_segment_index()) { auto offset = reserved_offset; - for (auto& data : loaded_field_datas) { + for (auto& data : field_data) { auto row_count = data->get_num_rows(); indexing_record_.AppendingIndex( offset, row_count, field_id, data, insert_record_); @@ -288,7 +284,7 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { try_remove_chunks(field_id); if (field_id == primary_field_id) { - insert_record_.insert_pks(loaded_field_datas); + insert_record_.insert_pks(field_data); } // update average row data size @@ -297,13 +293,13 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { SegmentInternalInterface::set_field_avg_size( field_id, num_rows, - storage::GetByteSizeOfFieldDatas(loaded_field_datas)); + storage::GetByteSizeOfFieldDatas(field_data)); } // build text match index if (field_meta.enable_match()) { auto index = GetTextIndex(field_id); - index->BuildIndexFromFieldData(loaded_field_datas, + index->BuildIndexFromFieldData(field_data, field_meta.is_nullable()); index->Commit(); // Reload reader so that the index can be read immediately @@ -313,20 +309,18 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { // build json match index if (field_meta.enable_growing_jsonStats()) { auto index = GetJsonKeyIndex(field_id); - index->BuildWithFieldData(loaded_field_datas, - field_meta.is_nullable()); + index->BuildWithFieldData(field_data, field_meta.is_nullable()); index->Commit(); // Reload reader so that the index can be read immediately index->Reload(); } // update the mem size - stats_.mem_size += storage::GetByteSizeOfFieldDatas(loaded_field_datas); + stats_.mem_size += storage::GetByteSizeOfFieldDatas(field_data); - LOG_INFO("segment {} loads field {} done, recovering:{}", + LOG_INFO("segment {} loads field {} done", this->get_segment_id(), - field_id.get(), - infos.recovering); + field_id.get()); } // step 5: update small indexes diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 93b6edc4ad..df5b13b209 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -320,17 +320,14 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { auto parallel_degree = static_cast( DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); field_data_info.channel->set_capacity(parallel_degree * 2); - auto priority = load_info.recovering ? milvus::ThreadPoolPriority::HIGH - : milvus::ThreadPoolPriority::LOW; - LoadFieldDatasFromRemote( - insert_files, field_data_info.channel, priority); + auto& pool = + ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); + pool.Submit( + LoadFieldDatasFromRemote, insert_files, field_data_info.channel); - LOG_INFO( - "segment {} submits load field {} task to thread pool, " - "recovering:{}", - this->get_segment_id(), - field_id.get(), - load_info.recovering); + LOG_INFO("segment {} submits load field {} task to thread pool", + this->get_segment_id(), + field_id.get()); bool use_mmap = false; if (!info.enable_mmap || SystemProperty::Instance().IsSystem(field_id)) { diff --git a/internal/core/src/segcore/Types.h b/internal/core/src/segcore/Types.h index a8751722a4..c1cf8e524a 100644 --- a/internal/core/src/segcore/Types.h +++ b/internal/core/src/segcore/Types.h @@ -51,8 +51,6 @@ struct LoadIndexInfo { // (aka. the filesize before loading operation at knowhere), // because the uncompressed-index-file-size may not be stored at previous milvus. // so the size may be not accurate (generated by the compressed-index-file-size multiplied with a compress-ratio) - bool - recovering; //indicator to separate recovering load and normal load, to reduce influence for front-end search&&query }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 84c2f1b82f..a04e62671f 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -869,12 +869,12 @@ ReverseDataFromIndex(const index::IndexBase* index, // segcore use default remote chunk manager to load data from minio/s3 void LoadArrowReaderFromRemote(const std::vector& remote_files, - std::shared_ptr channel, - milvus::ThreadPoolPriority priority) { + std::shared_ptr channel) { try { auto rcm = storage::RemoteChunkManagerSingleton::GetInstance() .GetRemoteChunkManager(); - auto& pool = ThreadPools::GetThreadPool(priority); + auto& pool = ThreadPools::GetThreadPool(ThreadPoolPriority::HIGH); + std::vector>> futures; futures.reserve(remote_files.size()); @@ -905,12 +905,12 @@ LoadArrowReaderFromRemote(const std::vector& remote_files, void LoadFieldDatasFromRemote(const std::vector& remote_files, - FieldDataChannelPtr channel, - milvus::ThreadPoolPriority priority) { + FieldDataChannelPtr channel) { try { auto rcm = storage::RemoteChunkManagerSingleton::GetInstance() .GetRemoteChunkManager(); - auto& pool = ThreadPools::GetThreadPool(priority); + auto& pool = ThreadPools::GetThreadPool(ThreadPoolPriority::HIGH); + std::vector> futures; futures.reserve(remote_files.size()); for (const auto& file : remote_files) { @@ -923,10 +923,12 @@ LoadFieldDatasFromRemote(const std::vector& remote_files, }); futures.emplace_back(std::move(future)); } + for (auto& future : futures) { auto field_data = future.get(); channel->push(field_data); } + channel->close(); } catch (std::exception& e) { LOG_INFO("failed to load data from remote: {}", e.what()); diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index bb77e2fba1..a946fae20b 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -28,7 +28,6 @@ #include "log/Log.h" #include "segcore/DeletedRecord.h" #include "segcore/InsertRecord.h" -#include "storage/ThreadPools.h" namespace milvus::segcore { @@ -116,16 +115,12 @@ ReverseDataFromIndex(const index::IndexBase* index, const FieldMeta& field_meta); void -LoadArrowReaderFromRemote( - const std::vector& remote_files, - std::shared_ptr channel, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); +LoadArrowReaderFromRemote(const std::vector& remote_files, + std::shared_ptr channel); void -LoadFieldDatasFromRemote( - const std::vector& remote_files, - FieldDataChannelPtr channel, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); +LoadFieldDatasFromRemote(const std::vector& remote_files, + FieldDataChannelPtr channel); /** * Returns an index pointing to the first element in the range [first, last) such that `value < element` is true * (i.e. that is strictly greater than value), or last if no such element is found. diff --git a/internal/core/src/segcore/load_field_data_c.cpp b/internal/core/src/segcore/load_field_data_c.cpp index d9d415c64c..ddab5a28f4 100644 --- a/internal/core/src/segcore/load_field_data_c.cpp +++ b/internal/core/src/segcore/load_field_data_c.cpp @@ -109,9 +109,3 @@ EnableMmap(CLoadFieldDataInfo c_load_field_data_info, auto info = static_cast(c_load_field_data_info); info->field_infos[field_id].enable_mmap = enabled; } - -void -SetRecovering(CLoadFieldDataInfo c_load_field_data_info, bool recovering) { - auto info = static_cast(c_load_field_data_info); - info->recovering = recovering; -} diff --git a/internal/core/src/segcore/load_field_data_c.h b/internal/core/src/segcore/load_field_data_c.h index 9e4a8378fa..0b3b7dee9f 100644 --- a/internal/core/src/segcore/load_field_data_c.h +++ b/internal/core/src/segcore/load_field_data_c.h @@ -58,9 +58,6 @@ EnableMmap(CLoadFieldDataInfo c_load_field_data_info, int64_t field_id, bool enabled); -void -SetRecovering(CLoadFieldDataInfo c_load_field_data_info, bool recovering); - #ifdef __cplusplus } #endif diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index f82fc5cee1..b847c4d297 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -30,7 +30,6 @@ #include "pb/cgo_msg.pb.h" #include "knowhere/index/index_static.h" #include "knowhere/comp/knowhere_check.h" -#include "storage/ThreadPools.h" bool IsLoadWithDisk(const char* index_type, int index_engine_version) { @@ -269,14 +268,12 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { milvus::tracer::SetRootSpan(span); LOG_INFO( - "[collection={}][segment={}][field={}][enable_mmap={}][recovering={" - "}] load index " + "[collection={}][segment={}][field={}][enable_mmap={}] load index " "{}", load_index_info->collection_id, load_index_info->segment_id, load_index_info->field_id, load_index_info->enable_mmap, - load_index_info->recovering, load_index_info->index_id); // get index type @@ -336,9 +333,6 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { config[milvus::index::ENABLE_MMAP] = "true"; config[milvus::index::MMAP_FILE_PATH] = filepath.string(); } - config[milvus::THREAD_POOL_PRIORITY] = - load_index_info->recovering ? milvus::ThreadPoolPriority::HIGH - : milvus::ThreadPoolPriority::LOW; LOG_DEBUG("load index with configs: {}", config.dump()); load_index_info->index->Load(ctx, config); @@ -347,14 +341,12 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { milvus::tracer::CloseRootSpan(); LOG_INFO( - "[collection={}][segment={}][field={}][enable_mmap={}][recovering={" - "}] load index " + "[collection={}][segment={}][field={}][enable_mmap={}] load index " "{} done", load_index_info->collection_id, load_index_info->segment_id, load_index_info->field_id, load_index_info->enable_mmap, - load_index_info->recovering, load_index_info->index_id); auto status = CStatus(); @@ -502,7 +494,6 @@ FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info, info_proto->index_engine_version(); load_index_info->schema = info_proto->field(); load_index_info->index_size = info_proto->index_file_size(); - load_index_info->recovering = info_proto->recovering(); } auto status = CStatus(); status.error_code = milvus::Success; diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 227ea01d9f..e4c002d0f1 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -475,9 +475,7 @@ LoadTextIndex(CSegmentInterface c_segment, files.push_back(f); } config["index_files"] = files; - config[milvus::THREAD_POOL_PRIORITY] = - info_proto->recovering() ? milvus::ThreadPoolPriority::HIGH - : milvus::ThreadPoolPriority::LOW; + milvus::storage::FileManagerContext ctx( field_meta, index_meta, remote_chunk_manager); @@ -530,9 +528,6 @@ LoadJsonKeyIndex(CTraceContext c_trace, files.push_back(f); } config["index_files"] = files; - config[milvus::THREAD_POOL_PRIORITY] = - info_proto->recovering() ? milvus::ThreadPoolPriority::HIGH - : milvus::ThreadPoolPriority::LOW; milvus::storage::FileManagerContext file_ctx( field_meta, index_meta, remote_chunk_manager); diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 191c452920..6713b14930 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -220,8 +220,7 @@ DiskFileManagerImpl::AddBatchIndexFiles( void DiskFileManagerImpl::CacheIndexToDiskInternal( const std::vector& remote_files, - const std::function& get_local_index_prefix, - milvus::ThreadPoolPriority priority) noexcept { + const std::function& get_local_index_prefix) noexcept { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); @@ -261,8 +260,7 @@ DiskFileManagerImpl::CacheIndexToDiskInternal( uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); auto appendIndexFiles = [&]() { - auto index_chunks = - GetObjectData(rcm_.get(), batch_remote_files, priority); + auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files); for (auto& chunk : index_chunks) { auto index_data = chunk.get()->GetFieldData(); auto index_size = index_data->DataSize(); @@ -290,36 +288,28 @@ DiskFileManagerImpl::CacheIndexToDiskInternal( void DiskFileManagerImpl::CacheIndexToDisk( - const std::vector& remote_files, - milvus::ThreadPoolPriority priority) { + const std::vector& remote_files) { return CacheIndexToDiskInternal( - remote_files, - [this]() { return GetLocalIndexObjectPrefix(); }, - priority); + remote_files, [this]() { return GetLocalIndexObjectPrefix(); }); } void DiskFileManagerImpl::CacheTextLogToDisk( - const std::vector& remote_files, - milvus::ThreadPoolPriority priority) { + const std::vector& remote_files) { return CacheIndexToDiskInternal( - remote_files, [this]() { return GetLocalTextIndexPrefix(); }, priority); + remote_files, [this]() { return GetLocalTextIndexPrefix(); }); } void DiskFileManagerImpl::CacheJsonKeyIndexToDisk( - const std::vector& remote_files, - milvus::ThreadPoolPriority priority) { + const std::vector& remote_files) { return CacheIndexToDiskInternal( - remote_files, - [this]() { return GetLocalJsonKeyIndexPrefix(); }, - priority); + remote_files, [this]() { return GetLocalJsonKeyIndexPrefix(); }); } template std::string -DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files, - milvus::ThreadPoolPriority priority) { +DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { SortByPath(remote_files); auto segment_id = GetFieldDataMeta().segment_id; @@ -351,7 +341,7 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files, int64_t write_offset = sizeof(num_rows) + sizeof(dim); auto FetchRawData = [&]() { - auto field_datas = GetObjectData(rcm_.get(), batch_files, priority); + auto field_datas = GetObjectData(rcm_.get(), batch_files); int batch_size = batch_files.size(); for (int i = 0; i < batch_size; ++i) { auto field_data = field_datas[i].get()->GetFieldData(); @@ -704,18 +694,14 @@ DiskFileManagerImpl::IsExisted(const std::string& file) noexcept { template std::string DiskFileManagerImpl::CacheRawDataToDisk( - std::vector remote_files, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); + std::vector remote_files); template std::string DiskFileManagerImpl::CacheRawDataToDisk( - std::vector remote_files, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); + std::vector remote_files); template std::string DiskFileManagerImpl::CacheRawDataToDisk( - std::vector remote_files, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); + std::vector remote_files); template std::string DiskFileManagerImpl::CacheRawDataToDisk( - std::vector remote_files, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); + std::vector remote_files); } // namespace milvus::storage diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index f603e2d3dd..896c2b991e 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -26,7 +26,6 @@ #include "storage/FileManager.h" #include "storage/ChunkManager.h" #include "common/Consts.h" -#include "storage/ThreadPools.h" namespace milvus::storage { @@ -103,19 +102,13 @@ class DiskFileManagerImpl : public FileManagerImpl { } void - CacheIndexToDisk( - const std::vector& remote_files, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); + CacheIndexToDisk(const std::vector& remote_files); void - CacheTextLogToDisk( - const std::vector& remote_files, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); + CacheTextLogToDisk(const std::vector& remote_files); void - CacheJsonKeyIndexToDisk( - const std::vector& remote_files, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); + CacheJsonKeyIndexToDisk(const std::vector& remote_files); void AddBatchIndexFiles(const std::string& local_file_name, @@ -125,9 +118,7 @@ class DiskFileManagerImpl : public FileManagerImpl { template std::string - CacheRawDataToDisk( - std::vector remote_files, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); + CacheRawDataToDisk(std::vector remote_files); std::string CacheOptFieldToDisk(OptFieldT& fields_map); @@ -168,9 +159,7 @@ class DiskFileManagerImpl : public FileManagerImpl { void CacheIndexToDiskInternal( const std::vector& remote_files, - const std::function& get_local_index_prefix, - milvus::ThreadPoolPriority priority = - milvus::ThreadPoolPriority::LOW) noexcept; + const std::function& get_local_index_prefix) noexcept; private: // local file path (abs path) diff --git a/internal/core/src/storage/MemFileManagerImpl.cpp b/internal/core/src/storage/MemFileManagerImpl.cpp index f0096a7cb7..3f21781bc4 100644 --- a/internal/core/src/storage/MemFileManagerImpl.cpp +++ b/internal/core/src/storage/MemFileManagerImpl.cpp @@ -98,15 +98,14 @@ MemFileManagerImpl::LoadFile(const std::string& filename) noexcept { std::map MemFileManagerImpl::LoadIndexToMemory( - const std::vector& remote_files, - milvus::ThreadPoolPriority priority) { + const std::vector& remote_files) { std::map file_to_index_data; auto parallel_degree = static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); std::vector batch_files; auto LoadBatchIndexFiles = [&]() { - auto index_datas = GetObjectData(rcm_.get(), batch_files, priority); + auto index_datas = GetObjectData(rcm_.get(), batch_files); for (size_t idx = 0; idx < batch_files.size(); ++idx) { auto file_name = batch_files[idx].substr(batch_files[idx].find_last_of('/') + 1); diff --git a/internal/core/src/storage/MemFileManagerImpl.h b/internal/core/src/storage/MemFileManagerImpl.h index 5f47bd56ad..13bd219ec3 100644 --- a/internal/core/src/storage/MemFileManagerImpl.h +++ b/internal/core/src/storage/MemFileManagerImpl.h @@ -26,7 +26,6 @@ #include "storage/IndexData.h" #include "storage/FileManager.h" #include "storage/ChunkManager.h" -#include "storage/ThreadPools.h" namespace milvus::storage { @@ -53,8 +52,7 @@ class MemFileManagerImpl : public FileManagerImpl { } std::map - LoadIndexToMemory(const std::vector& remote_files, - milvus::ThreadPoolPriority priority); + LoadIndexToMemory(const std::vector& remote_files); std::vector CacheRawDataToMemory(std::vector remote_files); diff --git a/internal/core/src/storage/ThreadPools.h b/internal/core/src/storage/ThreadPools.h index 9e0b4e2765..a728befabb 100644 --- a/internal/core/src/storage/ThreadPools.h +++ b/internal/core/src/storage/ThreadPools.h @@ -22,11 +22,10 @@ namespace milvus { -constexpr const char* THREAD_POOL_PRIORITY = "priority"; - enum ThreadPoolPriority { HIGH = 0, - LOW = 1, + MIDDLE = 1, + LOW = 2, }; class ThreadPools { @@ -41,14 +40,17 @@ class ThreadPools { private: ThreadPools() { name_map[HIGH] = "high_priority_thread_pool"; + name_map[MIDDLE] = "middle_priority_thread_pool"; name_map[LOW] = "low_priority_thread_pool"; } static void SetUpCoefficients() { coefficient_map[HIGH] = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; + coefficient_map[MIDDLE] = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; coefficient_map[LOW] = LOW_PRIORITY_THREAD_CORE_COEFFICIENT; - LOG_INFO("Init ThreadPools, high_priority_co={}, low={}", + LOG_INFO("Init ThreadPools, high_priority_co={}, middle={}, low={}", HIGH_PRIORITY_THREAD_CORE_COEFFICIENT, + MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT, LOW_PRIORITY_THREAD_CORE_COEFFICIENT); } void diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 042f127967..57a2d44b91 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -639,9 +639,8 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, std::vector>> GetObjectData(ChunkManager* remote_chunk_manager, - const std::vector& remote_files, - milvus::ThreadPoolPriority priority) { - auto& pool = ThreadPools::GetThreadPool(priority); + const std::vector& remote_files) { + auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; futures.reserve(remote_files.size()); for (auto& file : remote_files) { @@ -658,7 +657,7 @@ PutIndexData(ChunkManager* remote_chunk_manager, const std::vector& slice_names, FieldDataMeta& field_meta, IndexMeta& index_meta) { - auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW); + auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); std::vector>> futures; AssertInfo(data_slices.size() == slice_sizes.size(), "inconsistent data slices size {} with slice sizes {}", @@ -859,20 +858,6 @@ GetByteSizeOfFieldDatas(const std::vector& field_datas) { return result; } -LoadedFieldDatasInfo -CollectFieldDataChannelWithInfos(FieldDataChannelPtr& channel) { - std::vector result; - FieldDataPtr field_data; - int64_t field_data_size = 0; - int64_t field_data_rows = 0; - while (channel->pop(field_data)) { - result.push_back(field_data); - field_data_size += field_data->DataSize(); - field_data_rows += field_data->get_num_rows(); - } - return {result, field_data_size, field_data_rows}; -} - std::vector CollectFieldDataChannel(FieldDataChannelPtr& channel) { std::vector result; diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index ea1db6c9cc..511656220d 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -31,7 +31,6 @@ #include "storage/ChunkManager.h" #include "storage/DataCodec.h" #include "storage/Types.h" -#include "storage/ThreadPools.h" namespace milvus::storage { @@ -140,10 +139,8 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, std::string object_key); std::vector>> -GetObjectData( - ChunkManager* remote_chunk_manager, - const std::vector& remote_files, - milvus::ThreadPoolPriority priority = milvus::ThreadPoolPriority::LOW); +GetObjectData(ChunkManager* remote_chunk_manager, + const std::vector& remote_files); std::map PutIndexData(ChunkManager* remote_chunk_manager, @@ -177,15 +174,6 @@ CreateFieldData(const DataType& type, int64_t GetByteSizeOfFieldDatas(const std::vector& field_datas); -struct LoadedFieldDatasInfo { - std::vector loaded_field_datas_; - int64_t data_size_; - int64_t data_rows_; -}; - -LoadedFieldDatasInfo -CollectFieldDataChannelWithInfos(FieldDataChannelPtr& channel); - std::vector CollectFieldDataChannel(FieldDataChannelPtr& channel); diff --git a/internal/core/unittest/test_array_bitmap_index.cpp b/internal/core/unittest/test_array_bitmap_index.cpp index 7597d14fb4..66205b1139 100644 --- a/internal/core/unittest/test_array_bitmap_index.cpp +++ b/internal/core/unittest/test_array_bitmap_index.cpp @@ -249,7 +249,6 @@ class ArrayBitmapIndexTest : public testing::Test { index_info.field_type = DataType::ARRAY; config["index_files"] = index_files; - config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; ctx.set_for_loading_index(true); index_ = diff --git a/internal/core/unittest/test_bitmap_index.cpp b/internal/core/unittest/test_bitmap_index.cpp index afe0b31ddb..078546ae86 100644 --- a/internal/core/unittest/test_bitmap_index.cpp +++ b/internal/core/unittest/test_bitmap_index.cpp @@ -167,7 +167,6 @@ class BitmapIndexTest : public testing::Test { field_id); ; } - config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; index_ = index::IndexFactory::GetInstance().CreateIndex(index_info, ctx); index_->Load(milvus::tracer::TraceContext{}, config); diff --git a/internal/core/unittest/test_disk_file_manager_test.cpp b/internal/core/unittest/test_disk_file_manager_test.cpp index e6d58bac5d..565e063b6d 100644 --- a/internal/core/unittest/test_disk_file_manager_test.cpp +++ b/internal/core/unittest/test_disk_file_manager_test.cpp @@ -94,8 +94,7 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositiveParallel) { std::cout << file2size.first << std::endl; remote_files.emplace_back(file2size.first); } - diskAnnFileManager->CacheIndexToDisk(remote_files, - milvus::ThreadPoolPriority::HIGH); + diskAnnFileManager->CacheIndexToDisk(remote_files); auto local_files = diskAnnFileManager->GetLocalFilePaths(); for (auto& file : local_files) { auto file_size = lcm->Size(file); diff --git a/internal/core/unittest/test_hybrid_index.cpp b/internal/core/unittest/test_hybrid_index.cpp index 906782c4a4..fb95b2c13f 100644 --- a/internal/core/unittest/test_hybrid_index.cpp +++ b/internal/core/unittest/test_hybrid_index.cpp @@ -162,7 +162,6 @@ class HybridIndexTestV1 : public testing::Test { index_info.field_type = type_; config["index_files"] = index_files; - config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; ctx.set_for_loading_index(true); index_ = diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp index ce2c065eca..1b29a22a0c 100644 --- a/internal/core/unittest/test_indexing.cpp +++ b/internal/core/unittest/test_indexing.cpp @@ -500,7 +500,6 @@ TEST_P(IndexTest, BuildAndQuery) { ASSERT_GT(serializedSize, 0); load_conf = generate_load_conf(index_type, metric_type, 0); load_conf["index_files"] = index_files; - load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; ASSERT_NO_THROW(vec_index->Load(milvus::tracer::TraceContext{}, load_conf)); EXPECT_EQ(vec_index->Count(), NB); if (!is_sparse) { @@ -568,7 +567,6 @@ TEST_P(IndexTest, Mmap) { load_conf = generate_load_conf(index_type, metric_type, 0); load_conf["index_files"] = index_files; load_conf["mmap_filepath"] = "mmap/test_index_mmap_" + index_type; - load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; vec_index->Load(milvus::tracer::TraceContext{}, load_conf); EXPECT_EQ(vec_index->Count(), NB); EXPECT_EQ(vec_index->GetDim(), is_sparse ? kTestSparseDim : DIM); @@ -624,7 +622,6 @@ TEST_P(IndexTest, GetVector) { load_conf["index_files"] = index_files; vec_index = dynamic_cast(new_index.get()); - load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; vec_index->Load(milvus::tracer::TraceContext{}, load_conf); if (!is_sparse) { EXPECT_EQ(vec_index->GetDim(), DIM); @@ -734,7 +731,6 @@ TEST(Indexing, SearchDiskAnnWithInvalidParam) { auto vec_index = dynamic_cast(new_index.get()); auto load_conf = generate_load_conf(index_type, metric_type, NB); load_conf["index_files"] = index_files; - load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; vec_index->Load(milvus::tracer::TraceContext{}, load_conf); EXPECT_EQ(vec_index->Count(), NB); @@ -819,7 +815,6 @@ TEST(Indexing, SearchDiskAnnWithFloat16) { auto vec_index = dynamic_cast(new_index.get()); auto load_conf = generate_load_conf(index_type, metric_type, NB); load_conf["index_files"] = index_files; - load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; vec_index->Load(milvus::tracer::TraceContext{}, load_conf); EXPECT_EQ(vec_index->Count(), NB); @@ -903,7 +898,6 @@ TEST(Indexing, SearchDiskAnnWithBFloat16) { auto vec_index = dynamic_cast(new_index.get()); auto load_conf = generate_load_conf(index_type, metric_type, NB); load_conf["index_files"] = index_files; - load_conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; vec_index->Load(milvus::tracer::TraceContext{}, load_conf); EXPECT_EQ(vec_index->Count(), NB); diff --git a/internal/core/unittest/test_inverted_index.cpp b/internal/core/unittest/test_inverted_index.cpp index 0715be3f54..8da7f2c078 100644 --- a/internal/core/unittest/test_inverted_index.cpp +++ b/internal/core/unittest/test_inverted_index.cpp @@ -208,7 +208,7 @@ test_run() { Config config; config["index_files"] = index_files; - config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; + ctx.set_for_loading_index(true); auto index = index::IndexFactory::GetInstance().CreateIndex(index_info, ctx); @@ -483,7 +483,7 @@ test_string() { Config config; config["index_files"] = index_files; - config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; + ctx.set_for_loading_index(true); auto index = index::IndexFactory::GetInstance().CreateIndex(index_info, ctx); diff --git a/internal/core/unittest/test_json_key_stats_index.cpp b/internal/core/unittest/test_json_key_stats_index.cpp index 99470e5901..30f9b18ea9 100644 --- a/internal/core/unittest/test_json_key_stats_index.cpp +++ b/internal/core/unittest/test_json_key_stats_index.cpp @@ -126,7 +126,6 @@ class JsonKeyStatsIndexTest : public ::testing::TestWithParam { index::CreateIndexInfo index_info{}; config["index_files"] = index_files; - config[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; index_ = std::make_shared(ctx, true); index_->Load(milvus::tracer::TraceContext{}, config); diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 12b4d3ea0c..1282f08c28 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -1303,7 +1303,6 @@ GenVecIndexing(int64_t N, auto create_index_result = indexing->Upload(); auto index_files = create_index_result->GetIndexFiles(); conf["index_files"] = index_files; - conf[milvus::THREAD_POOL_PRIORITY] = milvus::ThreadPoolPriority::HIGH; // we need a load stage to use index as the producation does // knowhere would do some data preparation in this stage indexing->Load(milvus::tracer::TraceContext{}, conf); @@ -1440,4 +1439,4 @@ NewCollection(const milvus::proto::schema::CollectionSchema* schema, return (void*)collection.release(); } -} // namespace milvus::segcore \ No newline at end of file +} // namespace milvus::segcore diff --git a/internal/core/unittest/test_utils/storage_test_utils.h b/internal/core/unittest/test_utils/storage_test_utils.h index 541388e594..baa60c7712 100644 --- a/internal/core/unittest/test_utils/storage_test_utils.h +++ b/internal/core/unittest/test_utils/storage_test_utils.h @@ -124,7 +124,7 @@ PutFieldData(milvus::storage::ChunkManager* remote_chunk_manager, FieldDataMeta& field_data_meta, milvus::FieldMeta& field_meta) { auto& pool = - milvus::ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); + milvus::ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); std::vector>> futures; AssertInfo(buffers.size() == element_counts.size(), "inconsistent size of data slices with slice sizes!"); diff --git a/internal/datacoord/stats_task_meta.go b/internal/datacoord/stats_task_meta.go index c376b39e1d..11be93105c 100644 --- a/internal/datacoord/stats_task_meta.go +++ b/internal/datacoord/stats_task_meta.go @@ -19,10 +19,9 @@ package datacoord import ( "context" "fmt" - "strconv" - "go.uber.org/zap" "google.golang.org/protobuf/proto" + "strconv" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/pkg/v2/log" diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index a88dd88e36..6b2a7f0643 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -178,6 +178,8 @@ func (i *IndexNode) initSegcore() { // set up thread pool for different priorities cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64()) C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) + cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64()) + C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64()) C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index b51938652a..80d0f67d45 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -38,7 +38,6 @@ type SegmentAssignPlan struct { FromScore int64 ToScore int64 SegmentScore int64 - Recovering bool } func (segPlan *SegmentAssignPlan) String() string { diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index 0dfb21c35e..18b9fd2565 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -52,7 +52,6 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou source, p.Segment.GetCollectionID(), p.Replica, - p.Recovering, actions..., ) if err != nil { @@ -74,7 +73,6 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou zap.Int64("replica", p.Replica.GetID()), zap.String("channel", p.Segment.GetInsertChannel()), zap.String("level", p.Segment.GetLevel().String()), - zap.Bool("recovering", p.Recovering), zap.Int64("from", p.From), zap.Int64("to", p.To)) if task.GetTaskType(t) == task.TaskTypeMove { diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 44bf275d40..7d97c7492d 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -224,7 +224,6 @@ func (c *IndexChecker) createSegmentUpdateTask(ctx context.Context, segment *met c.ID(), segment.GetCollectionID(), replica, - false, action, ) if err != nil { @@ -280,7 +279,6 @@ func (c *IndexChecker) createSegmentStatsUpdateTask(ctx context.Context, segment c.ID(), segment.GetCollectionID(), replica, - false, action, ) if err != nil { diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 3aeb74b73b..50e638dd35 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -92,11 +92,7 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { if c.readyToCheck(ctx, cid) { replicas := c.meta.ReplicaManager.GetByCollection(ctx, cid) for _, r := range replicas { - replicaTasks, finishRecover := c.checkReplica(ctx, r) - results = append(results, replicaTasks...) - if r.IsRecovering() && finishRecover { - c.meta.ReplicaManager.FinishRecoverReplica(r.GetID()) - } + results = append(results, c.checkReplica(ctx, r)...) } } } @@ -128,13 +124,13 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { return results } -func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica) ([]task.Task, bool) { +func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica) []task.Task { ret := make([]task.Task, 0) // compare with targets to find the lack and redundancy of segments - lacks, recovering, redundancies, finishRecover := c.getSealedSegmentDiff(ctx, replica.GetCollectionID(), replica) + lacks, redundancies := c.getSealedSegmentDiff(ctx, replica.GetCollectionID(), replica.GetID()) // loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan) - tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, recovering, replica) + tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica) task.SetReason("lacks of segment", tasks...) task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) @@ -161,7 +157,7 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica task.SetPriority(task.TaskPriorityNormal, tasks...) ret = append(ret, tasks...) - return ret, finishRecover + return ret } // GetGrowingSegmentDiff get streaming segment diff between leader view and target @@ -233,8 +229,9 @@ func (c *SegmentChecker) getGrowingSegmentDiff(ctx context.Context, collectionID func (c *SegmentChecker) getSealedSegmentDiff( ctx context.Context, collectionID int64, - replica *meta.Replica, -) (toLoad []*datapb.SegmentInfo, recovering []bool, toRelease []*meta.Segment, finishRecover bool) { + replicaID int64, +) (toLoad []*datapb.SegmentInfo, toRelease []*meta.Segment) { + replica := c.meta.Get(ctx, replicaID) if replica == nil { log.Info("replica does not exist, skip it") return @@ -285,31 +282,26 @@ func (c *SegmentChecker) getSealedSegmentDiff( nextTargetExist := c.targetMgr.IsNextTargetExist(ctx, collectionID) nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, meta.NextTarget) currentTargetMap := c.targetMgr.GetSealedSegmentsByCollection(ctx, collectionID, meta.CurrentTarget) - finishRecover = currentTargetMap != nil || nextTargetMap != nil - // l0 Segment which exist on current target, but not on dist - for _, segment := range currentTargetMap { - if isSegmentLack(segment) { - toLoad = append(toLoad, segment) - recovering = append(recovering, true) - finishRecover = false - // for segments lacked due to node down, we need to load it under recovering mode - } - } // Segment which exist on next target, but not on dist for _, segment := range nextTargetMap { - // to avoid generate duplicate segment task - if currentTargetMap[segment.ID] != nil { - continue - } if isSegmentLack(segment) { toLoad = append(toLoad, segment) - recovering = append(recovering, replica.IsRecovering()) - finishRecover = finishRecover && !replica.IsRecovering() - // for segments lacked due to normal target advance, we load them under normal mode - // for segments lacked due to whole instance restart, we load them under recover mode } } + + // l0 Segment which exist on current target, but not on dist + for _, segment := range currentTargetMap { + // to avoid generate duplicate segment task + if nextTargetMap[segment.ID] != nil { + continue + } + + if isSegmentLack(segment) { + toLoad = append(toLoad, segment) + } + } + // get segment which exist on dist, but not on current target and next target for _, segment := range dist { _, existOnCurrent := currentTargetMap[segment.GetID()] @@ -329,9 +321,8 @@ func (c *SegmentChecker) getSealedSegmentDiff( // to make sure all L0 delta logs will be delivered to the other segments. if len(level0Segments) > 0 { toLoad = level0Segments - recovering = nil - // for any l0 segments, we try to load them as soon as possible } + return } @@ -418,26 +409,10 @@ func (c *SegmentChecker) filterSegmentInUse(ctx context.Context, replica *meta.R return filtered } -func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []*datapb.SegmentInfo, recovering []bool, replica *meta.Replica) []task.Task { +func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []*datapb.SegmentInfo, replica *meta.Replica) []task.Task { if len(segments) == 0 { return nil } - // set up recover map - if recovering != nil && len(recovering) != len(segments) { - // this branch should never be reached - log.Warn("Recovering slice has different size with segment size, this should never happen", - zap.Int("recover_len", len(recovering)), - zap.Int("segments_len", len(segments))) - return nil - } - recoverMap := make(map[int64]bool, len(segments)) - for i, segment := range segments { - if recovering == nil { - recoverMap[segment.GetID()] = true - } else { - recoverMap[segment.GetID()] = recovering[i] - } - } isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string { @@ -470,7 +445,6 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] shardPlans := c.getBalancerFunc().AssignSegment(ctx, replica.GetCollectionID(), segmentInfos, rwNodes, true) for i := range shardPlans { shardPlans[i].Replica = replica - shardPlans[i].Recovering = recoverMap[shardPlans[i].Segment.GetID()] } plans = append(plans, shardPlans...) } @@ -488,7 +462,6 @@ func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments c.ID(), s.GetCollectionID(), replica, - false, action, ) if err != nil { diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 694ba38492..a94a0e5727 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -36,10 +36,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" - "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" - "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type SegmentCheckerTestSuite struct { @@ -172,146 +170,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { suite.Len(tasks, 1) } -func (suite *SegmentCheckerTestSuite) TestRecoverReplica() { - ctx := context.Background() - checker := suite.checker - // set meta - checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1)) - checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1)) - // set up a recovering replica - nodes := []int64{1, 2} - replica := meta.NewRecoveringReplica( - &querypb.Replica{ - ID: 1, - CollectionID: 1, - Nodes: nodes, - ResourceGroup: meta.DefaultResourceGroupName, - }, - typeutil.NewUniqueSet(nodes...), - ) - checker.meta.ReplicaManager.Put(ctx, replica) - // set up nodes - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 1, - Address: "localhost", - Hostname: "localhost", - })) - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 2, - Address: "localhost", - Hostname: "localhost", - })) - checker.meta.ResourceManager.HandleNodeUp(ctx, 1) - checker.meta.ResourceManager.HandleNodeUp(ctx, 2) - - // set target - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1)) - - // set dist - checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) - - tasks := checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok := tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeGrow, action.Type()) - suite.EqualValues(1, action.GetSegmentID()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - // generated task should be run under recovering mode - // for recovering replica, recovering segment is added based on next target - suite.True(tasks[0].(*task.SegmentTask).Recovering()) - - // update dist for loading finished - segmentsInfos := make([]*meta.Segment, 0) - segmentsInfos = append(segmentsInfos, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel")) - checker.dist.SegmentDistManager.Update(2, segmentsInfos...) - tasks = checker.Check(context.TODO()) - suite.Len(tasks, 0) - newReplica := checker.meta.ReplicaManager.Get(context.TODO(), 1) - suite.False(newReplica.IsRecovering()) // Recover should be finished -} - -func (suite *SegmentCheckerTestSuite) TestRecoverQueryNodes() { - ctx := context.Background() - checker := suite.checker - // set meta - checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1)) - checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1)) - // set up a normal running replica, not recovering - checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(1, 1, []int64{1, 2})) - // set up nodes - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 1, - Address: "localhost", - Hostname: "localhost", - })) - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 2, - Address: "localhost", - Hostname: "localhost", - })) - checker.meta.ResourceManager.HandleNodeUp(ctx, 1) - checker.meta.ResourceManager.HandleNodeUp(ctx, 2) - - // set target - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1)) - checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1)) - - // set dist - checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) - - tasks := checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok := tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeGrow, action.Type()) - suite.EqualValues(1, action.GetSegmentID()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - // generated task should also be run under recovering mode - // for normal replica, recovering segment is added based on current target - suite.True(tasks[0].(*task.SegmentTask).Recovering()) -} - func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() { ctx := context.Background() checker := suite.checker @@ -370,7 +228,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() { suite.EqualValues(1, action.GetSegmentID()) suite.EqualValues(2, action.Node()) suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - suite.True(tasks[0].(*task.SegmentTask).Recovering()) checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1)) // test load l0 segments in current target @@ -384,7 +241,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() { suite.EqualValues(1, action.GetSegmentID()) suite.EqualValues(2, action.Node()) suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - suite.True(tasks[0].(*task.SegmentTask).Recovering()) // seg l0 segment exist on a non delegator node checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1)) @@ -400,102 +256,6 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() { suite.EqualValues(1, action.GetSegmentID()) suite.EqualValues(2, action.Node()) suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - suite.True(tasks[0].(*task.SegmentTask).Recovering()) -} - -func (suite *SegmentCheckerTestSuite) TestRecoverL0Segments() { - ctx := context.Background() - checker := suite.checker - // set meta - checker.meta.CollectionManager.PutCollection(ctx, utils.CreateTestCollection(1, 1)) - checker.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(1, 1)) - nodes := []int64{1, 2} - replica := meta.NewRecoveringReplica( - &querypb.Replica{ - ID: 1, - CollectionID: 1, - Nodes: nodes, - ResourceGroup: meta.DefaultResourceGroupName, - }, - typeutil.NewUniqueSet(nodes...), - ) - checker.meta.ReplicaManager.Put(ctx, replica) - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 1, - Address: "localhost", - Hostname: "localhost", - Version: common.Version, - })) - suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 2, - Address: "localhost", - Hostname: "localhost", - Version: common.Version, - })) - checker.meta.ResourceManager.HandleNodeUp(ctx, 1) - checker.meta.ResourceManager.HandleNodeUp(ctx, 2) - - // set target - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - Level: datapb.SegmentLevel_L0, - }, - { - ID: 2, - PartitionID: 1, - InsertChannel: "test-insert-channel", - Level: datapb.SegmentLevel_L1, - }, - } - - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - checker.targetMgr.UpdateCollectionNextTarget(ctx, int64(1)) - - // set dist - checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) - - // test load l0 segments in next target - tasks := checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok := tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeGrow, action.Type()) - suite.EqualValues(1, action.GetSegmentID()) - suite.EqualValues(2, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - suite.True(tasks[0].(*task.SegmentTask).Recovering()) - - checker.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(1)) - // test load l0 segments in current target - tasks = checker.Check(context.TODO()) - suite.Len(tasks, 1) - suite.Len(tasks[0].Actions(), 1) - action, ok = tasks[0].Actions()[0].(*task.SegmentAction) - suite.True(ok) - suite.EqualValues(1, tasks[0].ReplicaID()) - suite.Equal(task.ActionTypeGrow, action.Type()) - suite.EqualValues(1, action.GetSegmentID()) - suite.EqualValues(2, action.Node()) - suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) - suite.True(tasks[0].(*task.SegmentTask).Recovering()) - - wrongRecovering := make([]bool, 1) - wrongTasks := checker.createSegmentLoadTasks(ctx, segments, wrongRecovering, replica) - suite.Nil(wrongTasks) } func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() { diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 062dddf972..e13eb273eb 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -132,7 +132,6 @@ func (s *Server) balanceSegments(ctx context.Context, utils.ManualBalance, collectionID, plan.Replica, - false, actions..., ) if err != nil { diff --git a/internal/querycoordv2/meta/replica.go b/internal/querycoordv2/meta/replica.go index fed25c7247..bee254aebd 100644 --- a/internal/querycoordv2/meta/replica.go +++ b/internal/querycoordv2/meta/replica.go @@ -25,7 +25,6 @@ type Replica struct { // always keep consistent with replicaPB.RoNodes. // node used by replica but cannot add more channel or segment ont it. // include rebalance node or node out of resource group. - recovering bool } // Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead. @@ -39,17 +38,6 @@ func NewReplica(replica *querypb.Replica, nodes ...typeutil.UniqueSet) *Replica return newReplica(r) } -// Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead. -func NewRecoveringReplica(replica *querypb.Replica, nodes ...typeutil.UniqueSet) *Replica { - r := proto.Clone(replica).(*querypb.Replica) - // TODO: nodes is a bad parameter, break the consistency, should be removed in future. - // keep it for old unittest. - if len(nodes) > 0 && len(replica.Nodes) == 0 && nodes[0].Len() > 0 { - r.Nodes = nodes[0].Collect() - } - return newRecoveringReplica(r) -} - // newReplica creates a new replica from pb. func newReplica(replica *querypb.Replica) *Replica { return &Replica{ @@ -59,19 +47,6 @@ func newReplica(replica *querypb.Replica) *Replica { } } -func newRecoveringReplica(replica *querypb.Replica) *Replica { - return &Replica{ - replicaPB: proto.Clone(replica).(*querypb.Replica), - rwNodes: typeutil.NewUniqueSet(replica.Nodes...), - roNodes: typeutil.NewUniqueSet(replica.RoNodes...), - recovering: true, - } -} - -func (replica *Replica) IsRecovering() bool { - return replica.recovering -} - // GetID returns the id of the replica. func (replica *Replica) GetID() typeutil.UniqueID { return replica.replicaPB.GetID() @@ -176,10 +151,9 @@ func (replica *Replica) CopyForWrite() *mutableReplica { return &mutableReplica{ Replica: &Replica{ - replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica), - rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...), - roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...), - recovering: replica.IsRecovering(), + replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica), + rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...), + roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...), }, exclusiveRWNodeToChannel: exclusiveRWNodeToChannel, } diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index f7ba77449c..fdd867aa30 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -94,8 +94,7 @@ func (m *ReplicaManager) Recover(ctx context.Context, collections []int64) error } if collectionSet.Contain(replica.GetCollectionID()) { - rep := newRecoveringReplica(replica) - m.putReplicaInMemory(rep) + m.putReplicaInMemory(newReplica(replica)) log.Info("recover replica", zap.Int64("collectionID", replica.GetCollectionID()), zap.Int64("replicaID", replica.GetID()), @@ -298,15 +297,6 @@ func (m *ReplicaManager) RemoveCollection(ctx context.Context, collectionID type return nil } -func (m *ReplicaManager) FinishRecoverReplica(replicaId typeutil.UniqueID) { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - rep := m.replicas[replicaId] - mutableRep := rep.CopyForWrite() - mutableRep.recovering = false - m.putReplicaInMemory(mutableRep.IntoReplica()) -} - func (m *ReplicaManager) RemoveReplicas(ctx context.Context, collectionID typeutil.UniqueID, replicas ...typeutil.UniqueID) error { m.rwmutex.Lock() defer m.rwmutex.Unlock() diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 030fb11552..faca7776fb 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -196,7 +196,6 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { if err != nil { return err } - loadInfo.Recovering = task.Recovering() req := packLoadSegmentRequest( task, diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index e02d59d908..6fcbfb3eba 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -324,8 +324,7 @@ func (task *baseTask) Name() string { type SegmentTask struct { *baseTask - segmentID typeutil.UniqueID - recovering bool + segmentID typeutil.UniqueID } // NewSegmentTask creates a SegmentTask with actions, @@ -336,7 +335,6 @@ func NewSegmentTask(ctx context.Context, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, - recovering bool, actions ...Action, ) (*SegmentTask, error) { if len(actions) == 0 { @@ -361,16 +359,11 @@ func NewSegmentTask(ctx context.Context, base := newBaseTask(ctx, source, collectionID, replica, shard, fmt.Sprintf("SegmentTask-%s-%d", actions[0].Type().String(), segmentID)) base.actions = actions return &SegmentTask{ - baseTask: base, - segmentID: segmentID, - recovering: recovering, + baseTask: base, + segmentID: segmentID, }, nil } -func (task *SegmentTask) Recovering() bool { - return task.recovering -} - func (task *SegmentTask) SegmentID() typeutil.UniqueID { return task.segmentID } @@ -384,7 +377,7 @@ func (task *SegmentTask) Name() string { } func (task *SegmentTask) String() string { - return fmt.Sprintf("%s [segmentID=%d][recovering=%t]", task.baseTask.String(), task.segmentID, task.recovering) + return fmt.Sprintf("%s [segmentID=%d]", task.baseTask.String(), task.segmentID) } func (task *SegmentTask) MarshalJSON() ([]byte, error) { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 6bab9f7b3d..f2a830fcb3 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -459,7 +459,6 @@ func (suite *TaskSuite) TestLoadSegmentTask() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) suite.NoError(err) @@ -561,7 +560,6 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) suite.NoError(err) @@ -656,7 +654,6 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) suite.NoError(err) @@ -722,7 +719,6 @@ func (suite *TaskSuite) TestReleaseSegmentTask() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeReduce, channel.GetChannelName(), segment), ) suite.NoError(err) @@ -767,7 +763,6 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming, 0), ) suite.NoError(err) @@ -864,7 +859,6 @@ func (suite *TaskSuite) TestMoveSegmentTask() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), NewSegmentAction(sourceNode, ActionTypeReduce, channel.GetChannelName(), segment), ) @@ -949,7 +943,6 @@ func (suite *TaskSuite) TestMoveSegmentTaskStale() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), NewSegmentAction(sourceNode, ActionTypeReduce, channel.GetChannelName(), segment), ) @@ -1028,7 +1021,6 @@ func (suite *TaskSuite) TestTaskCanceled() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) suite.NoError(err) @@ -1114,7 +1106,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) suite.NoError(err) @@ -1335,19 +1326,19 @@ func (suite *TaskSuite) TestCreateTaskBehavior() { suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(chanelTask) - segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, false) + segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica) suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) channelAction := NewChannelAction(0, 0, "fake-channel1") - segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, false, channelAction) + segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, channelAction) suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) segmentAction1 := NewSegmentAction(0, 0, "", 0) segmentAction2 := NewSegmentAction(0, 0, "", 1) - segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, false, segmentAction1, segmentAction2) + segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, WrapIDSource(0), 0, meta.NilReplica, segmentAction1, segmentAction2) suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) @@ -1368,7 +1359,6 @@ func (suite *TaskSuite) TestSegmentTaskReplace() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, "", segment), ) suite.NoError(err) @@ -1386,7 +1376,6 @@ func (suite *TaskSuite) TestSegmentTaskReplace() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, "", segment), ) suite.NoError(err) @@ -1406,7 +1395,6 @@ func (suite *TaskSuite) TestSegmentTaskReplace() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, "", segment), ) suite.NoError(err) @@ -1447,7 +1435,6 @@ func (suite *TaskSuite) TestNoExecutor() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) suite.NoError(err) @@ -1802,7 +1789,6 @@ func (suite *TaskSuite) TestGetTasksJSON() { WrapIDSource(0), suite.collection, suite.replica, - false, NewSegmentAction(1, ActionTypeGrow, "", 1), ) suite.NoError(err) @@ -1844,7 +1830,6 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { WrapIDSource(0), coll, suite.replica, - false, NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical, 100), ) task1.SetID(1) @@ -1874,7 +1859,6 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { WrapIDSource(0), coll2, suite.replica, - false, NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical, 100), ) suite.NoError(err) @@ -1994,7 +1978,6 @@ func (suite *TaskSuite) TestRemoveTaskWithError() { WrapIDSource(0), coll, suite.replica, - false, NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", 1, querypb.DataScope_Historical, 100), ) suite.NoError(err) diff --git a/internal/querycoordv2/task/utils_test.go b/internal/querycoordv2/task/utils_test.go index 8d0cfdbaa8..56869c26df 100644 --- a/internal/querycoordv2/task/utils_test.go +++ b/internal/querycoordv2/task/utils_test.go @@ -44,7 +44,6 @@ func (s *UtilsSuite) TestPackLoadSegmentRequest() { nil, 1, newReplicaDefaultRG(10), - false, action, ) s.NoError(err) @@ -100,7 +99,6 @@ func (s *UtilsSuite) TestPackLoadSegmentRequestMmap() { nil, 1, newReplicaDefaultRG(10), - false, action, ) s.NoError(err) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 35089eb427..98b70aa979 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -822,8 +822,7 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun Field: field, EnableMMap: mmapEnabled, }}, - RowCount: rowCount, - Recovering: s.loadInfo.Load().GetRecovering(), + RowCount: rowCount, } GetLoadPool().Submit(func() (any, error) { @@ -952,7 +951,7 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del func GetCLoadInfoWithFunc(ctx context.Context, fieldSchema *schemapb.FieldSchema, - loadInfo *querypb.SegmentLoadInfo, + s *querypb.SegmentLoadInfo, indexInfo *querypb.FieldIndexInfo, f func(c *LoadIndexInfo) error, ) error { @@ -986,9 +985,9 @@ func GetCLoadInfoWithFunc(ctx context.Context, enableMmap := isIndexMmapEnable(fieldSchema, indexInfo) indexInfoProto := &cgopb.LoadIndexInfo{ - CollectionID: loadInfo.GetCollectionID(), - PartitionID: loadInfo.GetPartitionID(), - SegmentID: loadInfo.GetSegmentID(), + CollectionID: s.GetCollectionID(), + PartitionID: s.GetPartitionID(), + SegmentID: s.GetSegmentID(), Field: fieldSchema, EnableMmap: enableMmap, MmapDirPath: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(), @@ -1000,7 +999,6 @@ func GetCLoadInfoWithFunc(ctx context.Context, IndexEngineVersion: indexInfo.GetCurrentIndexVersion(), IndexStoreVersion: indexInfo.GetIndexStoreVersion(), IndexFileSize: indexInfo.GetIndexSize(), - Recovering: loadInfo.GetRecovering(), } // 2. @@ -1141,7 +1139,6 @@ func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextI Schema: f, CollectionID: s.Collection(), PartitionID: s.Partition(), - Recovering: s.LoadInfo().GetRecovering(), } marshaled, err := proto.Marshal(cgoProto) @@ -1188,7 +1185,6 @@ func (s *LocalSegment) LoadJSONKeyIndex(ctx context.Context, jsonKeyStats *datap Schema: f, CollectionID: s.Collection(), PartitionID: s.Partition(), - Recovering: s.loadInfo.Load().GetRecovering(), } marshaled, err := proto.Marshal(cgoProto) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 3533883631..5c7bcc06d1 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -879,8 +879,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context, log.Info("start loading segment files", zap.Int64("rowNum", loadInfo.GetNumOfRows()), - zap.String("segmentType", segment.Type().String()), - zap.Bool("isRecovering", loadInfo.GetRecovering())) + zap.String("segmentType", segment.Type().String())) collection := loader.manager.Collection.Get(segment.Collection()) if collection == nil { diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 69c9c2d1f7..b5bfc16b54 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -214,6 +214,8 @@ func (node *QueryNode) InitSegcore() error { // set up thread pool for different priorities cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64()) C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) + cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64()) + C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64()) C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) diff --git a/internal/util/segcore/requests.go b/internal/util/segcore/requests.go index 19218706fb..42eca2c6e1 100644 --- a/internal/util/segcore/requests.go +++ b/internal/util/segcore/requests.go @@ -34,10 +34,9 @@ type DeleteRequest struct { } type LoadFieldDataRequest struct { - Fields []LoadFieldDataInfo - MMapDir string - RowCount int64 - Recovering bool + Fields []LoadFieldDataInfo + MMapDir string + RowCount int64 } type LoadFieldDataInfo struct { @@ -83,7 +82,6 @@ func (req *LoadFieldDataRequest) getCLoadFieldDataRequest() (result *cLoadFieldD defer C.free(unsafe.Pointer(mmapDir)) C.AppendMMapDirPath(cLoadFieldDataInfo, mmapDir) } - C.SetRecovering(cLoadFieldDataInfo, C.bool(req.Recovering)) return &cLoadFieldDataRequest{ cLoadFieldDataInfo: cLoadFieldDataInfo, }, nil diff --git a/pkg/proto/cgo_msg.proto b/pkg/proto/cgo_msg.proto index a5ba4e5d16..1ebc91dcbe 100644 --- a/pkg/proto/cgo_msg.proto +++ b/pkg/proto/cgo_msg.proto @@ -21,7 +21,6 @@ message LoadIndexInfo { int64 index_store_version = 14; int32 index_engine_version = 15; int64 index_file_size = 16; - bool recovering = 17; } message IndexStats { diff --git a/pkg/proto/cgopb/cgo_msg.pb.go b/pkg/proto/cgopb/cgo_msg.pb.go index 843c458631..90c297cc6b 100644 --- a/pkg/proto/cgopb/cgo_msg.pb.go +++ b/pkg/proto/cgopb/cgo_msg.pb.go @@ -41,7 +41,6 @@ type LoadIndexInfo struct { IndexStoreVersion int64 `protobuf:"varint,14,opt,name=index_store_version,json=indexStoreVersion,proto3" json:"index_store_version,omitempty"` IndexEngineVersion int32 `protobuf:"varint,15,opt,name=index_engine_version,json=indexEngineVersion,proto3" json:"index_engine_version,omitempty"` IndexFileSize int64 `protobuf:"varint,16,opt,name=index_file_size,json=indexFileSize,proto3" json:"index_file_size,omitempty"` - Recovering bool `protobuf:"varint,17,opt,name=recovering,proto3" json:"recovering,omitempty"` } func (x *LoadIndexInfo) Reset() { @@ -181,13 +180,6 @@ func (x *LoadIndexInfo) GetIndexFileSize() int64 { return 0 } -func (x *LoadIndexInfo) GetRecovering() bool { - if x != nil { - return x.Recovering - } - return false -} - type IndexStats struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -304,7 +296,7 @@ var file_cgo_msg_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x63, 0x67, 0x6f, 0x5f, 0x6d, 0x73, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x67, 0x6f, 0x1a, 0x0c, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, - 0xc6, 0x05, 0x0a, 0x0d, 0x4c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x49, 0x6e, 0x66, + 0xa6, 0x05, 0x0a, 0x0d, 0x4c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x22, 0x0a, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x12, 0x20, 0x0a, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, @@ -342,9 +334,7 @@ var file_cgo_msg_proto_rawDesc = []byte{ 0x64, 0x65, 0x78, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x26, 0x0a, 0x0f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x10, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x69, 0x6e, 0x64, 0x65, 0x78, - 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x72, 0x65, 0x63, 0x6f, - 0x76, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x11, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x72, 0x65, - 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x3e, 0x0a, 0x10, 0x49, 0x6e, 0x64, 0x65, + 0x46, 0x69, 0x6c, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x1a, 0x3e, 0x0a, 0x10, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, diff --git a/pkg/proto/index_cgo_msg.proto b/pkg/proto/index_cgo_msg.proto index ff4bee23b5..4a91c07f0c 100644 --- a/pkg/proto/index_cgo_msg.proto +++ b/pkg/proto/index_cgo_msg.proto @@ -93,7 +93,6 @@ message LoadTextIndexInfo { schema.FieldSchema schema = 5; int64 collectionID = 6; int64 partitionID = 7; - bool recovering = 8; } message LoadJsonKeyIndexInfo { @@ -104,5 +103,4 @@ message LoadJsonKeyIndexInfo { schema.FieldSchema schema = 5; int64 collectionID = 6; int64 partitionID = 7; - bool recovering = 8; } diff --git a/pkg/proto/indexcgopb/index_cgo_msg.pb.go b/pkg/proto/indexcgopb/index_cgo_msg.pb.go index 906d38e755..e9eb05c5d0 100644 --- a/pkg/proto/indexcgopb/index_cgo_msg.pb.go +++ b/pkg/proto/indexcgopb/index_cgo_msg.pb.go @@ -773,7 +773,6 @@ type LoadTextIndexInfo struct { Schema *schemapb.FieldSchema `protobuf:"bytes,5,opt,name=schema,proto3" json:"schema,omitempty"` CollectionID int64 `protobuf:"varint,6,opt,name=collectionID,proto3" json:"collectionID,omitempty"` PartitionID int64 `protobuf:"varint,7,opt,name=partitionID,proto3" json:"partitionID,omitempty"` - Recovering bool `protobuf:"varint,8,opt,name=recovering,proto3" json:"recovering,omitempty"` } func (x *LoadTextIndexInfo) Reset() { @@ -857,13 +856,6 @@ func (x *LoadTextIndexInfo) GetPartitionID() int64 { return 0 } -func (x *LoadTextIndexInfo) GetRecovering() bool { - if x != nil { - return x.Recovering - } - return false -} - type LoadJsonKeyIndexInfo struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -876,7 +868,6 @@ type LoadJsonKeyIndexInfo struct { Schema *schemapb.FieldSchema `protobuf:"bytes,5,opt,name=schema,proto3" json:"schema,omitempty"` CollectionID int64 `protobuf:"varint,6,opt,name=collectionID,proto3" json:"collectionID,omitempty"` PartitionID int64 `protobuf:"varint,7,opt,name=partitionID,proto3" json:"partitionID,omitempty"` - Recovering bool `protobuf:"varint,8,opt,name=recovering,proto3" json:"recovering,omitempty"` } func (x *LoadJsonKeyIndexInfo) Reset() { @@ -960,13 +951,6 @@ func (x *LoadJsonKeyIndexInfo) GetPartitionID() int64 { return 0 } -func (x *LoadJsonKeyIndexInfo) GetRecovering() bool { - if x != nil { - return x.Recovering - } - return false -} - var File_index_cgo_msg_proto protoreflect.FileDescriptor var file_index_cgo_msg_proto_rawDesc = []byte{ @@ -1111,7 +1095,7 @@ var file_index_cgo_msg_proto_rawDesc = []byte{ 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x5f, 0x74, 0x61, 0x6e, 0x74, 0x69, 0x76, 0x79, 0x5f, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x18, 0x16, 0x20, 0x01, 0x28, 0x03, 0x52, 0x19, 0x6a, 0x73, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x53, 0x74, 0x61, 0x74, 0x73, 0x54, 0x61, 0x6e, - 0x74, 0x69, 0x76, 0x79, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x22, 0x97, 0x02, 0x0a, 0x11, 0x4c, + 0x74, 0x69, 0x76, 0x79, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x22, 0xf7, 0x01, 0x0a, 0x11, 0x4c, 0x6f, 0x61, 0x64, 0x54, 0x65, 0x78, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x18, 0x0a, 0x07, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, @@ -1127,9 +1111,7 @@ var file_index_cgo_msg_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x12, 0x20, 0x0a, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x49, 0x44, 0x12, 0x1e, 0x0a, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, - 0x6e, 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, - 0x72, 0x69, 0x6e, 0x67, 0x22, 0x9a, 0x02, 0x0a, 0x14, 0x4c, 0x6f, 0x61, 0x64, 0x4a, 0x73, 0x6f, + 0x6f, 0x6e, 0x49, 0x44, 0x22, 0xfa, 0x01, 0x0a, 0x14, 0x4c, 0x6f, 0x61, 0x64, 0x4a, 0x73, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x18, 0x0a, 0x07, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, @@ -1145,9 +1127,7 @@ var file_index_cgo_msg_proto_rawDesc = []byte{ 0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x12, 0x20, 0x0a, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, - 0x44, 0x12, 0x1e, 0x0a, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x18, - 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e, - 0x67, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x44, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x63, 0x67, 0x6f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, diff --git a/pkg/proto/query_coord.proto b/pkg/proto/query_coord.proto index 47d6144a69..f0a2c546bc 100644 --- a/pkg/proto/query_coord.proto +++ b/pkg/proto/query_coord.proto @@ -375,7 +375,6 @@ message SegmentLoadInfo { map textStatsLogs = 20; repeated data.FieldBinlog bm25logs = 21; map jsonKeyStatsLogs = 22; - bool recovering = 23; } message FieldIndexInfo { diff --git a/pkg/proto/querypb/query_coord.pb.go b/pkg/proto/querypb/query_coord.pb.go index f343ea3051..49e2e37830 100644 --- a/pkg/proto/querypb/query_coord.pb.go +++ b/pkg/proto/querypb/query_coord.pb.go @@ -2049,7 +2049,6 @@ type SegmentLoadInfo struct { TextStatsLogs map[int64]*datapb.TextIndexStats `protobuf:"bytes,20,rep,name=textStatsLogs,proto3" json:"textStatsLogs,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Bm25Logs []*datapb.FieldBinlog `protobuf:"bytes,21,rep,name=bm25logs,proto3" json:"bm25logs,omitempty"` JsonKeyStatsLogs map[int64]*datapb.JsonKeyStats `protobuf:"bytes,22,rep,name=jsonKeyStatsLogs,proto3" json:"jsonKeyStatsLogs,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Recovering bool `protobuf:"varint,23,opt,name=recovering,proto3" json:"recovering,omitempty"` } func (x *SegmentLoadInfo) Reset() { @@ -2239,13 +2238,6 @@ func (x *SegmentLoadInfo) GetJsonKeyStatsLogs() map[int64]*datapb.JsonKeyStats { return nil } -func (x *SegmentLoadInfo) GetRecovering() bool { - if x != nil { - return x.Recovering - } - return false -} - type FieldIndexInfo struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -7541,7 +7533,7 @@ var file_query_coord_proto_rawDesc = []byte{ 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x44, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x68, - 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xe0, 0x0a, 0x0a, 0x0f, 0x53, 0x65, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xc0, 0x0a, 0x0a, 0x0f, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x12, 0x20, 0x0a, 0x0b, 0x70, @@ -7613,9 +7605,7 @@ var file_query_coord_proto_rawDesc = []byte{ 0x74, 0x4c, 0x6f, 0x61, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x4a, 0x73, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x53, 0x74, 0x61, 0x74, 0x73, 0x4c, 0x6f, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x10, 0x6a, 0x73, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x53, 0x74, 0x61, 0x74, 0x73, 0x4c, 0x6f, 0x67, - 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x18, - 0x17, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x72, 0x65, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x69, 0x6e, - 0x67, 0x1a, 0x63, 0x0a, 0x12, 0x54, 0x65, 0x78, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x4c, 0x6f, + 0x73, 0x1a, 0x63, 0x0a, 0x12, 0x54, 0x65, 0x78, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x4c, 0x6f, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x37, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x6d, 0x69, 0x6c, 0x76, 0x75,