From 69eb1d254e4c2071ddad78f71e777e3580680fe1 Mon Sep 17 00:00:00 2001 From: groot Date: Sat, 20 Jun 2020 17:47:10 +0800 Subject: [PATCH 1/3] #2640 Signed-off-by: groot --- core/src/config/Config.cpp | 4 +- core/src/config/Config.h | 1 + core/unittest/server/test_config.cpp | 58 ++++++++++++++++++++++------ 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/core/src/config/Config.cpp b/core/src/config/Config.cpp index 74b48f47e2..967d0ca11a 100644 --- a/core/src/config/Config.cpp +++ b/core/src/config/Config.cpp @@ -863,6 +863,7 @@ Status Config::RegisterCallBack(const std::string& node, const std::string& sub_node, const std::string& key, ConfigCallBackF& cb) { std::string cb_node = node + "." + sub_node; + std::lock_guard lock(callback_mutex_); if (config_callback_.find(cb_node) == config_callback_.end()) { return Status(SERVER_UNEXPECTED_ERROR, cb_node + " is not supported changed in mem"); } @@ -875,6 +876,7 @@ Config::RegisterCallBack(const std::string& node, const std::string& sub_node, c Status Config::CancelCallBack(const std::string& node, const std::string& sub_node, const std::string& key) { + std::lock_guard lock(callback_mutex_); if (config_callback_.empty() || key.empty()) { return Status::OK(); } @@ -1950,7 +1952,7 @@ Config::GetConfigVersion(std::string& value) { Status Config::ExecCallBacks(const std::string& node, const std::string& sub_node, const std::string& value) { auto status = Status::OK(); - + std::lock_guard lock(callback_mutex_); if (config_callback_.empty()) { return Status(SERVER_UNEXPECTED_ERROR, "Callback map is empty. Cannot take effect in-service"); } diff --git a/core/src/config/Config.h b/core/src/config/Config.h index a39d0065e7..101091abaf 100644 --- a/core/src/config/Config.h +++ b/core/src/config/Config.h @@ -548,6 +548,7 @@ class Config { std::string config_file_; std::unordered_map> config_map_; std::unordered_map> config_callback_; + std::mutex callback_mutex_; std::mutex mutex_; }; diff --git a/core/unittest/server/test_config.cpp b/core/unittest/server/test_config.cpp index da41b875cc..d02ddd201d 100644 --- a/core/unittest/server/test_config.cpp +++ b/core/unittest/server/test_config.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -19,6 +20,7 @@ #include "config/Config.h" #include "config/YamlConfigMgr.h" +#include "config/handler/CacheConfigHandler.h" #include "server/utils.h" #include "utils/CommonUtil.h" #include "utils/StringHelpFunctions.h" @@ -30,10 +32,42 @@ static constexpr uint64_t KB = 1024; static constexpr uint64_t MB = KB * 1024; static constexpr uint64_t GB = MB * 1024; +class TestConfigHandler : public milvus::server::CacheConfigHandler { + public: + TestConfigHandler() { + SetIdentity("MemTableFile"); + AddInsertBufferSizeListener(); + } +}; + } // namespace namespace ms = milvus::server; +TEST_F(ConfigTest, CONFIG_HANDLER_TEST) { + auto test_func = [&]() { + uint64_t count = 10000, index = 0; + while (true) { + if (index++ == count) { + break; + } + + // register callback + TestConfigHandler ttt; + + // trigger callback + auto& config = milvus::server::Config::GetInstance(); + config.SetCacheConfigInsertBufferSize("1GB"); + } + }; + + using ThreadPtr = std::shared_ptr; + ThreadPtr thread_1 = std::make_shared(test_func); + ThreadPtr thread_2 = std::make_shared(test_func); + thread_1->join(); + thread_2->join(); +} + TEST_F(ConfigTest, CONFIG_TEST) { milvus::server::ConfigMgr* config_mgr = milvus::server::YamlConfigMgr::GetInstance(); @@ -535,7 +569,7 @@ TEST_F(ConfigTest, SERVER_CONFIG_CLI_TEST) { std::string engine_gpu_search_threshold = "800"; get_cmd = gen_get_command(ms::CONFIG_GPU_RESOURCE, ms::CONFIG_GPU_RESOURCE_GPU_SEARCH_THRESHOLD); set_cmd = gen_set_command(ms::CONFIG_GPU_RESOURCE, ms::CONFIG_GPU_RESOURCE_GPU_SEARCH_THRESHOLD, - engine_gpu_search_threshold); + engine_gpu_search_threshold); s = config.ProcessConfigCli(dummy, set_cmd); ASSERT_TRUE(s.ok()); s = config.ProcessConfigCli(result, get_cmd); @@ -584,7 +618,7 @@ TEST_F(ConfigTest, SERVER_CONFIG_CLI_TEST) { std::string build_index_resources = "gpu0"; get_cmd = gen_get_command(ms::CONFIG_GPU_RESOURCE, ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES); set_cmd = - gen_set_command(ms::CONFIG_GPU_RESOURCE, ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, build_index_resources); + gen_set_command(ms::CONFIG_GPU_RESOURCE, ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, build_index_resources); s = config.ProcessConfigCli(dummy, set_cmd); ASSERT_TRUE(s.ok()); s = config.ProcessConfigCli(result, get_cmd); @@ -1255,8 +1289,8 @@ TEST_F(ConfigTest, SERVER_CONFIG_UPDATE_TEST) { std::string cmd_set, cmd_get; auto lambda = [&conf_file](const std::string& key, const std::string& child_key, - const std::string& default_value, std::string& value) { - auto * ymgr = milvus::server::YamlConfigMgr::GetInstance(); + const std::string& default_value, std::string& value) { + auto* ymgr = milvus::server::YamlConfigMgr::GetInstance(); auto status = ymgr->LoadConfigFile(conf_file); if (status.ok()) @@ -1276,52 +1310,52 @@ TEST_F(ConfigTest, SERVER_CONFIG_UPDATE_TEST) { ASSERT_TRUE(config.ProcessConfigCli(reply_set, cmd_set).ok()); ASSERT_TRUE(lambda(ms::CONFIG_CACHE, ms::CONFIG_CACHE_INSERT_BUFFER_SIZE, - ms::CONFIG_CACHE_INSERT_BUFFER_SIZE_DEFAULT, yaml_value).ok()); + ms::CONFIG_CACHE_INSERT_BUFFER_SIZE_DEFAULT, yaml_value).ok()); ASSERT_EQ("2", yaml_value); // test boolean config value cmd_set = gen_set_command(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR, "True"); ASSERT_TRUE(config.ProcessConfigCli(reply_set, cmd_set).ok()); ASSERT_TRUE(lambda(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR, - ms::CONFIG_METRIC_ENABLE_MONITOR_DEFAULT, yaml_value).ok()); + ms::CONFIG_METRIC_ENABLE_MONITOR_DEFAULT, yaml_value).ok()); ASSERT_EQ("true", yaml_value); cmd_set = gen_set_command(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR, "On"); ASSERT_TRUE(config.ProcessConfigCli(reply_set, cmd_set).ok()); ASSERT_TRUE(lambda(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR, - ms::CONFIG_METRIC_ENABLE_MONITOR_DEFAULT, yaml_value).ok()); + ms::CONFIG_METRIC_ENABLE_MONITOR_DEFAULT, yaml_value).ok()); ASSERT_EQ("true", yaml_value); cmd_set = gen_set_command(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR, "False"); ASSERT_TRUE(config.ProcessConfigCli(reply_set, cmd_set).ok()); ASSERT_TRUE(lambda(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR, - ms::CONFIG_METRIC_ENABLE_MONITOR_DEFAULT, yaml_value).ok()); + ms::CONFIG_METRIC_ENABLE_MONITOR_DEFAULT, yaml_value).ok()); ASSERT_EQ("false", yaml_value); cmd_set = gen_set_command(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR, "Off"); ASSERT_TRUE(config.ProcessConfigCli(reply_set, cmd_set).ok()); ASSERT_TRUE(lambda(ms::CONFIG_METRIC, ms::CONFIG_METRIC_ENABLE_MONITOR, - ms::CONFIG_METRIC_ENABLE_MONITOR_DEFAULT, yaml_value).ok()); + ms::CONFIG_METRIC_ENABLE_MONITOR_DEFAULT, yaml_value).ok()); ASSERT_EQ("false", yaml_value); // test path cmd_set = gen_set_command(ms::CONFIG_STORAGE, ms::CONFIG_STORAGE_PATH, "/tmp/milvus_config_unittest"); ASSERT_TRUE(config.ProcessConfigCli(reply_set, cmd_set).ok()); ASSERT_TRUE(lambda(ms::CONFIG_STORAGE, ms::CONFIG_STORAGE_PATH, - ms::CONFIG_STORAGE_PATH_DEFAULT, yaml_value).ok()); + ms::CONFIG_STORAGE_PATH_DEFAULT, yaml_value).ok()); ASSERT_EQ("/tmp/milvus_config_unittest", yaml_value); #ifdef MILVUS_GPU_VERSION cmd_set = gen_set_command(ms::CONFIG_GPU_RESOURCE, ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, "gpu0"); ASSERT_TRUE(config.ProcessConfigCli(reply_set, cmd_set).ok()); ASSERT_TRUE(lambda(ms::CONFIG_GPU_RESOURCE, ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, - ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES_DEFAULT, yaml_value).ok()); + ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES_DEFAULT, yaml_value).ok()); ASSERT_EQ("gpu0", yaml_value); cmd_set = gen_set_command(ms::CONFIG_GPU_RESOURCE, ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, "GPU0"); ASSERT_TRUE(config.ProcessConfigCli(reply_set, cmd_set).ok()); ASSERT_TRUE(lambda(ms::CONFIG_GPU_RESOURCE, ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, - ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES_DEFAULT, yaml_value).ok()); + ms::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES_DEFAULT, yaml_value).ok()); ASSERT_EQ("gpu0", yaml_value); #endif } From ac7403f00bfbe10c217a5fcfd0a7e77fff5a24d0 Mon Sep 17 00:00:00 2001 From: groot Date: Sun, 21 Jun 2020 19:03:21 +0800 Subject: [PATCH 2/3] search combine configable Signed-off-by: groot --- core/src/config/Config.cpp | 39 +++++++++++++++++++ core/src/config/Config.h | 8 ++++ .../config/handler/EngineConfigHandler.cpp | 24 ++++++++++++ core/src/config/handler/EngineConfigHandler.h | 13 ++++++- .../delivery/request/SearchCombineRequest.cpp | 16 ++++---- .../delivery/request/SearchCombineRequest.h | 8 +++- .../delivery/strategy/SearchReqStrategy.cpp | 13 ++++++- .../delivery/strategy/SearchReqStrategy.h | 3 +- 8 files changed, 111 insertions(+), 13 deletions(-) diff --git a/core/src/config/Config.cpp b/core/src/config/Config.cpp index 967d0ca11a..619474c1d2 100644 --- a/core/src/config/Config.cpp +++ b/core/src/config/Config.cpp @@ -114,6 +114,8 @@ const char* CONFIG_ENGINE_OMP_THREAD_NUM = "omp_thread_num"; const char* CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0"; const char* CONFIG_ENGINE_SIMD_TYPE = "simd_type"; const char* CONFIG_ENGINE_SIMD_TYPE_DEFAULT = "auto"; +const char* CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ = "search_combine_nq"; +const char* CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ_DEFAULT = "64"; /* gpu resource config */ const char* CONFIG_GPU_RESOURCE = "gpu"; @@ -200,6 +202,9 @@ Config::Config() { std::string node_blas_threshold = std::string(CONFIG_ENGINE) + "." + CONFIG_ENGINE_USE_BLAS_THRESHOLD; config_callback_[node_blas_threshold] = empty_map; + std::string node_search_combine = std::string(CONFIG_ENGINE) + "." + CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ; + config_callback_[node_search_combine] = empty_map; + // gpu resources config std::string node_gpu_enable = std::string(CONFIG_GPU_RESOURCE) + "." + CONFIG_GPU_RESOURCE_ENABLE; config_callback_[node_gpu_enable] = empty_map; @@ -477,6 +482,7 @@ Config::ResetDefaultConfig() { STATUS_CHECK(SetEngineConfigUseBlasThreshold(CONFIG_ENGINE_USE_BLAS_THRESHOLD_DEFAULT)); STATUS_CHECK(SetEngineConfigOmpThreadNum(CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT)); STATUS_CHECK(SetEngineConfigSimdType(CONFIG_ENGINE_SIMD_TYPE_DEFAULT)); + STATUS_CHECK(SetEngineSearchCombineMaxNq(CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ_DEFAULT)); /* gpu resource config */ #ifdef MILVUS_GPU_VERSION @@ -613,6 +619,8 @@ Config::SetConfigCli(const std::string& parent_key, const std::string& child_key status = SetEngineConfigOmpThreadNum(value); } else if (child_key == CONFIG_ENGINE_SIMD_TYPE) { status = SetEngineConfigSimdType(value); + } else if (child_key == CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ) { + status = SetEngineSearchCombineMaxNq(value); } else { status = Status(SERVER_UNEXPECTED_ERROR, invalid_node_str); } @@ -1552,6 +1560,18 @@ Config::CheckEngineConfigSimdType(const std::string& value) { return Status::OK(); } +Status +Config::CheckEngineSearchCombineMaxNq(const std::string& value) { + fiu_return_on("check_config_search_combine_nq_fail", Status(SERVER_INVALID_ARGUMENT, "")); + + if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { + std::string msg = "Invalid omp thread num: " + value + + ". Possible reason: engine_config.omp_thread_num is not a positive integer."; + return Status(SERVER_INVALID_ARGUMENT, msg); + } + return Status::OK(); +} + #ifdef MILVUS_GPU_VERSION /* gpu resource config */ @@ -2247,6 +2267,15 @@ Config::GetEngineConfigSimdType(std::string& value) { return CheckEngineConfigSimdType(value); } +Status +Config::GetEngineSearchCombineMaxNq(int64_t& value) { + std::string str = + GetConfigStr(CONFIG_ENGINE, CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ, CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ_DEFAULT); + // STATUS_CHECK(CheckEngineSearchCombineMaxNq(str)); + value = std::stoll(str); + return Status::OK(); +} + /* gpu resource config */ #ifdef MILVUS_GPU_VERSION @@ -2456,6 +2485,7 @@ Config::SetClusterConfigEnable(const std::string& value) { STATUS_CHECK(CheckClusterConfigEnable(value)); return SetConfigValueInMem(CONFIG_CLUSTER, CONFIG_CLUSTER_ENABLE, value); } + Status Config::SetClusterConfigRole(const std::string& value) { STATUS_CHECK(CheckClusterConfigRole(value)); @@ -2685,8 +2715,16 @@ Config::SetEngineConfigSimdType(const std::string& value) { return SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_SIMD_TYPE, value); } +Status +Config::SetEngineSearchCombineMaxNq(const std::string& value) { + STATUS_CHECK(CheckEngineSearchCombineMaxNq(value)); + STATUS_CHECK(SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ, value)); + return ExecCallBacks(CONFIG_ENGINE, CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ, value); +} + /* gpu resource config */ #ifdef MILVUS_GPU_VERSION + Status Config::SetGpuResourceConfigEnable(const std::string& value) { STATUS_CHECK(CheckGpuResourceConfigEnable(value)); @@ -2731,6 +2769,7 @@ Config::SetGpuResourceConfigBuildIndexResources(const std::string& value) { STATUS_CHECK(SetConfigValueInMem(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, value)); return ExecCallBacks(CONFIG_GPU_RESOURCE, CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, value); } + #endif /* tracing config */ diff --git a/core/src/config/Config.h b/core/src/config/Config.h index 101091abaf..89457422ba 100644 --- a/core/src/config/Config.h +++ b/core/src/config/Config.h @@ -102,6 +102,8 @@ extern const char* CONFIG_ENGINE_OMP_THREAD_NUM; extern const char* CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT; extern const char* CONFIG_ENGINE_SIMD_TYPE; extern const char* CONFIG_ENGINE_SIMD_TYPE_DEFAULT; +extern const char* CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ; +extern const char* CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ_DEFAULT; /* gpu resource config */ extern const char* CONFIG_GPU_RESOURCE; @@ -268,6 +270,8 @@ class Config { CheckEngineConfigOmpThreadNum(const std::string& value); Status CheckEngineConfigSimdType(const std::string& value); + Status + CheckEngineSearchCombineMaxNq(const std::string& value); #ifdef MILVUS_GPU_VERSION /* gpu resource config */ @@ -388,6 +392,8 @@ class Config { GetEngineConfigOmpThreadNum(int64_t& value); Status GetEngineConfigSimdType(std::string& value); + Status + GetEngineSearchCombineMaxNq(int64_t& value); #ifdef MILVUS_GPU_VERSION /* gpu resource config */ @@ -500,6 +506,8 @@ class Config { SetEngineConfigOmpThreadNum(const std::string& value); Status SetEngineConfigSimdType(const std::string& value); + Status + SetEngineSearchCombineMaxNq(const std::string& value); #ifdef MILVUS_GPU_VERSION /* gpu resource config */ diff --git a/core/src/config/handler/EngineConfigHandler.cpp b/core/src/config/handler/EngineConfigHandler.cpp index 51b08e17ba..e838bc0773 100644 --- a/core/src/config/handler/EngineConfigHandler.cpp +++ b/core/src/config/handler/EngineConfigHandler.cpp @@ -19,10 +19,12 @@ namespace server { EngineConfigHandler::EngineConfigHandler() { auto& config = Config::GetInstance(); config.GetEngineConfigUseBlasThreshold(use_blas_threshold_); + config.GetEngineSearchCombineMaxNq(search_combine_nq_); } EngineConfigHandler::~EngineConfigHandler() { RemoveUseBlasThresholdListener(); + RemoveSearchCombineMaxNqListener(); } //////////////////////////// Listener methods ////////////////////////////////// @@ -48,5 +50,27 @@ EngineConfigHandler::RemoveUseBlasThresholdListener() { config.CancelCallBack(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, identity_); } +void +EngineConfigHandler::AddSearchCombineMaxNqListener() { + ConfigCallBackF lambda = [this](const std::string& value) -> Status { + auto& config = server::Config::GetInstance(); + auto status = config.GetEngineSearchCombineMaxNq(search_combine_nq_); + if (status.ok()) { + OnSearchCombineMaxNqChanged(search_combine_nq_); + } + + return status; + }; + + auto& config = Config::GetInstance(); + config.RegisterCallBack(CONFIG_ENGINE, CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ, identity_, lambda); +} + +void +EngineConfigHandler::RemoveSearchCombineMaxNqListener() { + auto& config = Config::GetInstance(); + config.CancelCallBack(CONFIG_ENGINE, CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ, identity_); +} + } // namespace server } // namespace milvus diff --git a/core/src/config/handler/EngineConfigHandler.h b/core/src/config/handler/EngineConfigHandler.h index ebc055c7c8..3fed6c9847 100644 --- a/core/src/config/handler/EngineConfigHandler.h +++ b/core/src/config/handler/EngineConfigHandler.h @@ -28,16 +28,27 @@ class EngineConfigHandler : virtual public ConfigHandler { OnUseBlasThresholdChanged(int64_t threshold) { } + virtual void + OnSearchCombineMaxNqChanged(int64_t nq) { + search_combine_nq_ = nq; + } + protected: void AddUseBlasThresholdListener(); - protected: void RemoveUseBlasThresholdListener(); + void + AddSearchCombineMaxNqListener(); + + void + RemoveSearchCombineMaxNqListener(); + protected: int64_t use_blas_threshold_ = std::stoll(CONFIG_ENGINE_USE_BLAS_THRESHOLD_DEFAULT); + int64_t search_combine_nq_ = std::stoll(CONFIG_ENGINE_SEARCH_COMBINE_MAX_NQ_DEFAULT); }; } // namespace server diff --git a/core/src/server/delivery/request/SearchCombineRequest.cpp b/core/src/server/delivery/request/SearchCombineRequest.cpp index fa0c666640..0d08f87e46 100644 --- a/core/src/server/delivery/request/SearchCombineRequest.cpp +++ b/core/src/server/delivery/request/SearchCombineRequest.cpp @@ -27,7 +27,6 @@ namespace server { namespace { constexpr int64_t MAX_TOPK_GAP = 200; -constexpr uint64_t MAX_NQ = 200; void GetUniqueList(const std::vector& list, std::set& unique_list) { @@ -93,7 +92,8 @@ class TracingContextList { } // namespace -SearchCombineRequest::SearchCombineRequest() : BaseRequest(nullptr, BaseRequest::kSearchCombine) { +SearchCombineRequest::SearchCombineRequest(int64_t max_nq) + : BaseRequest(nullptr, BaseRequest::kSearchCombine), combine_max_nq_(max_nq) { } Status @@ -133,6 +133,8 @@ SearchCombineRequest::Combine(const SearchRequestPtr& request) { } request_list_.push_back(request); + vectors_data_.vector_count_ += request->VectorsData().vector_count_; + return Status::OK(); } @@ -152,11 +154,11 @@ SearchCombineRequest::CanCombine(const SearchRequestPtr& request) { } // sum of nq must less-equal than MAX_NQ - if (vectors_data_.vector_count_ > MAX_NQ || request->VectorsData().vector_count_ > MAX_NQ) { + if (vectors_data_.vector_count_ > combine_max_nq_ || request->VectorsData().vector_count_ > combine_max_nq_) { return false; } uint64_t total_nq = vectors_data_.vector_count_ + request->VectorsData().vector_count_; - if (total_nq > MAX_NQ) { + if (total_nq > combine_max_nq_) { return false; } @@ -178,7 +180,7 @@ SearchCombineRequest::CanCombine(const SearchRequestPtr& request) { } bool -SearchCombineRequest::CanCombine(const SearchRequestPtr& left, const SearchRequestPtr& right) { +SearchCombineRequest::CanCombine(const SearchRequestPtr& left, const SearchRequestPtr& right, int64_t max_nq) { if (left->CollectionName() != right->CollectionName()) { return false; } @@ -193,11 +195,11 @@ SearchCombineRequest::CanCombine(const SearchRequestPtr& left, const SearchReque } // sum of nq must less-equal than MAX_NQ - if (left->VectorsData().vector_count_ > MAX_NQ || right->VectorsData().vector_count_ > MAX_NQ) { + if (left->VectorsData().vector_count_ > max_nq || right->VectorsData().vector_count_ > max_nq) { return false; } uint64_t total_nq = left->VectorsData().vector_count_ + right->VectorsData().vector_count_; - if (total_nq > MAX_NQ) { + if (total_nq > max_nq) { return false; } diff --git a/core/src/server/delivery/request/SearchCombineRequest.h b/core/src/server/delivery/request/SearchCombineRequest.h index 3aa24bb928..a455c130d9 100644 --- a/core/src/server/delivery/request/SearchCombineRequest.h +++ b/core/src/server/delivery/request/SearchCombineRequest.h @@ -22,9 +22,11 @@ namespace milvus { namespace server { +constexpr int64_t COMBINE_MAX_NQ = 64; + class SearchCombineRequest : public BaseRequest { public: - SearchCombineRequest(); + SearchCombineRequest(int64_t max_nq = COMBINE_MAX_NQ); Status Combine(const SearchRequestPtr& request); @@ -33,7 +35,7 @@ class SearchCombineRequest : public BaseRequest { CanCombine(const SearchRequestPtr& request); static bool - CanCombine(const SearchRequestPtr& left, const SearchRequestPtr& right); + CanCombine(const SearchRequestPtr& left, const SearchRequestPtr& right, int64_t max_nq = COMBINE_MAX_NQ); protected: Status @@ -54,6 +56,8 @@ class SearchCombineRequest : public BaseRequest { std::set file_id_list_; std::vector request_list_; + + int64_t combine_max_nq_ = COMBINE_MAX_NQ; }; using SearchCombineRequestPtr = std::shared_ptr; diff --git a/core/src/server/delivery/strategy/SearchReqStrategy.cpp b/core/src/server/delivery/strategy/SearchReqStrategy.cpp index 3b49ed6964..0b66ca7b57 100644 --- a/core/src/server/delivery/strategy/SearchReqStrategy.cpp +++ b/core/src/server/delivery/strategy/SearchReqStrategy.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "server/delivery/strategy/SearchReqStrategy.h" +#include "config/Config.h" #include "server/delivery/request/SearchCombineRequest.h" #include "server/delivery/request/SearchRequest.h" #include "utils/CommonUtil.h" @@ -24,6 +25,8 @@ namespace milvus { namespace server { SearchReqStrategy::SearchReqStrategy() { + SetIdentity("SearchReqStrategy"); + AddSearchCombineMaxNqListener(); } Status @@ -34,15 +37,21 @@ SearchReqStrategy::ReScheduleQueue(const BaseRequestPtr& request, std::queue(request); BaseRequestPtr last_req = queue.back(); if (last_req->GetRequestType() == BaseRequest::kSearch) { SearchRequestPtr last_search_req = std::static_pointer_cast(last_req); - if (SearchCombineRequest::CanCombine(last_search_req, new_search_req)) { + if (SearchCombineRequest::CanCombine(last_search_req, new_search_req, search_combine_nq_)) { // combine request - SearchCombineRequestPtr combine_request = std::make_shared(); + SearchCombineRequestPtr combine_request = std::make_shared(search_combine_nq_); combine_request->Combine(last_search_req); combine_request->Combine(new_search_req); queue.back() = combine_request; // replace the last request to combine request diff --git a/core/src/server/delivery/strategy/SearchReqStrategy.h b/core/src/server/delivery/strategy/SearchReqStrategy.h index 20093c66c2..3d2c3de03b 100644 --- a/core/src/server/delivery/strategy/SearchReqStrategy.h +++ b/core/src/server/delivery/strategy/SearchReqStrategy.h @@ -11,6 +11,7 @@ #pragma once +#include "config/handler/EngineConfigHandler.h" #include "server/delivery/strategy/RequestStrategy.h" #include "utils/Status.h" @@ -20,7 +21,7 @@ namespace milvus { namespace server { -class SearchReqStrategy : public RequestStrategy { +class SearchReqStrategy : public RequestStrategy, public EngineConfigHandler { public: SearchReqStrategy(); From cb09380c6c77851491edc06e368a6e1aa8d2ec91 Mon Sep 17 00:00:00 2001 From: groot Date: Sun, 21 Jun 2020 21:33:05 +0800 Subject: [PATCH 3/3] typo Signed-off-by: groot --- core/src/server/delivery/request/SearchCombineRequest.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/delivery/request/SearchCombineRequest.h b/core/src/server/delivery/request/SearchCombineRequest.h index a455c130d9..d71c4c70c7 100644 --- a/core/src/server/delivery/request/SearchCombineRequest.h +++ b/core/src/server/delivery/request/SearchCombineRequest.h @@ -26,7 +26,7 @@ constexpr int64_t COMBINE_MAX_NQ = 64; class SearchCombineRequest : public BaseRequest { public: - SearchCombineRequest(int64_t max_nq = COMBINE_MAX_NQ); + explicit SearchCombineRequest(int64_t max_nq = COMBINE_MAX_NQ); Status Combine(const SearchRequestPtr& request);