diff --git a/configs/milvus.yaml b/configs/milvus.yaml index d879806466..783174195c 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -305,9 +305,10 @@ common: # please adjust in embedded Milvus: local storageType: minio - security: authorizationEnabled: false # tls mode values [0, 1, 2] # 0 is close, 1 is one-way authentication, 2 is two-way authentication. tlsMode: 0 + + mem_purge_ratio: 0.2 # in Linux os, if memory-fragmentation-size >= used-memory * ${mem_purge_ratio}, then do `malloc_trim` diff --git a/internal/core/src/common/CMakeLists.txt b/internal/core/src/common/CMakeLists.txt index f942a07715..1a0d5659d3 100644 --- a/internal/core/src/common/CMakeLists.txt +++ b/internal/core/src/common/CMakeLists.txt @@ -14,6 +14,7 @@ set(COMMON_SRC Types.cpp SystemProperty.cpp vector_index_c.cpp + memory_c.cpp ) add_library(milvus_common SHARED diff --git a/internal/core/src/common/memory_c.cpp b/internal/core/src/common/memory_c.cpp new file mode 100644 index 0000000000..8b935449b7 --- /dev/null +++ b/internal/core/src/common/memory_c.cpp @@ -0,0 +1,90 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifdef __linux__ +#include +#include +#endif + +#include "common/CGoHelper.h" +#include "common/memory_c.h" +#include "exceptions/EasyAssert.h" +#include "log/Log.h" + +void +DoMallocTrim() { +#ifdef __linux__ + malloc_trim(0); +#endif +} + +uint64_t +ParseMallocInfo() { +#ifdef __linux__ + char* mem_buffer; + size_t buffer_size; + FILE* stream; + stream = open_memstream(&mem_buffer, &buffer_size); + // malloc_info(0, stdout); + + /* + * The malloc_info() function exports an XML string that describes + * the current state of the memory-allocation implementation in the caller. + * The exported XML string includes information about `fast` and `rest`. + * According to the implementation of glibc, `fast` calculates ths size of all the + * fastbins, and `rest` calculates the size of all the bins except fastbins. + * ref: + * + * + */ + malloc_info(0, stream); + fflush(stream); + + rapidxml::xml_document<> doc; // character type defaults to char + doc.parse<0>(mem_buffer); // 0 means default parse flags + + rapidxml::xml_node<>* malloc_root_node = doc.first_node(); + auto total_fast_node = malloc_root_node->first_node()->next_sibling("total"); + AssertInfo(total_fast_node, "null total_fast_node detected when ParseMallocInfo"); + auto total_fast_size = std::stoul(total_fast_node->first_attribute("size")->value()); + + auto total_rest_node = total_fast_node->next_sibling("total"); + AssertInfo(total_fast_node, "null total_rest_node detected when ParseMallocInfo"); + auto total_rest_size = std::stoul(total_rest_node->first_attribute("size")->value()); + + fclose(stream); + free(mem_buffer); + + return total_fast_size + total_rest_size; +#else + return 0; // malloc_trim is unnecessary +#endif +} + +CStatus +PurgeMemory(uint64_t max_bins_size) { + try { + auto fast_and_rest_total = ParseMallocInfo(); + if (fast_and_rest_total >= max_bins_size) { + LOG_SEGCORE_DEBUG_ << "Purge memory fragmentation, max_bins_size(bytes) = " << max_bins_size + << ", fast_and_rest_total(bytes) = " << fast_and_rest_total; + DoMallocTrim(); + } + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(UnexpectedError, e.what()); + } +} diff --git a/internal/core/src/common/memory_c.h b/internal/core/src/common/memory_c.h new file mode 100644 index 0000000000..ccc2845a18 --- /dev/null +++ b/internal/core/src/common/memory_c.h @@ -0,0 +1,36 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +#include "common/type_c.h" + +/* + * In glibc, free chunks are stored in various lists based on size and history, + * so that the library can quickly find suitable chunks to satisfy allocation requests. + * The lists, called "bins". + * ref: + */ +CStatus +PurgeMemory(uint64_t max_bins_size); + +#ifdef __cplusplus +} +#endif diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 6800009cd3..f186aa8279 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -11,10 +11,8 @@ #include -#ifndef __APPLE__ - +#ifdef __linux__ #include - #endif #include "exceptions/EasyAssert.h" diff --git a/internal/core/src/segcore/collection_c.cpp b/internal/core/src/segcore/collection_c.cpp index b3bd13e7a3..cd643e8247 100644 --- a/internal/core/src/segcore/collection_c.cpp +++ b/internal/core/src/segcore/collection_c.cpp @@ -9,7 +9,7 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License -#ifndef __APPLE__ +#ifdef __linux__ #include #endif diff --git a/internal/core/src/segcore/reduce_c.cpp b/internal/core/src/segcore/reduce_c.cpp index f7cfa7af7d..19828867d4 100644 --- a/internal/core/src/segcore/reduce_c.cpp +++ b/internal/core/src/segcore/reduce_c.cpp @@ -9,9 +9,6 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License -#ifndef __APPLE__ -#include -#endif #include #include "Reduce.h" #include "common/CGoHelper.h" @@ -86,7 +83,4 @@ DeleteSearchResultDataBlobs(CSearchResultDataBlobs cSearchResultDataBlobs) { } auto search_result_data_blobs = reinterpret_cast(cSearchResultDataBlobs); delete search_result_data_blobs; -#ifdef __linux__ - malloc_trim(0); -#endif } diff --git a/internal/core/thirdparty/CMakeLists.txt b/internal/core/thirdparty/CMakeLists.txt index 4ab3100d01..fe7db5a5ce 100644 --- a/internal/core/thirdparty/CMakeLists.txt +++ b/internal/core/thirdparty/CMakeLists.txt @@ -72,4 +72,9 @@ endif() # ******************************* Thridparty jemalloc ******************************** #if ( LINUX ) # add_subdirectory( jemalloc ) -#endif() \ No newline at end of file +#endif() + +# ******************************* Thridparty rapidxml ******************************** +if ( LINUX ) + add_subdirectory( rapidxml ) +endif() diff --git a/internal/core/thirdparty/rapidxml/CMakeLists.txt b/internal/core/thirdparty/rapidxml/CMakeLists.txt new file mode 100644 index 0000000000..2d956d27cb --- /dev/null +++ b/internal/core/thirdparty/rapidxml/CMakeLists.txt @@ -0,0 +1,20 @@ +# Licensed to the LF AI & Data foundation under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +message("Download rapidxml to ${CMAKE_INSTALL_PREFIX}/include/rapidxml.hpp") +file(DOWNLOAD + https://raw.githubusercontent.com/dwd/rapidxml/master/rapidxml.hpp + ${CMAKE_INSTALL_PREFIX}/include/rapidxml/rapidxml.hpp) \ No newline at end of file diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index c5b7464e6d..a3c9f3a24d 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -23,6 +23,7 @@ #include #include "common/LoadInfo.h" +#include "common/memory_c.h" #include "pb/plan.pb.h" #include "query/ExprImpl.h" #include "segcore/Collection.h" @@ -191,6 +192,32 @@ serialize(const Message* msg) { return ret; } +#ifdef __linux__ + +//TEST(Common, Memory_benchmark) { +// auto run_times = 1000000; +// auto start = std::chrono::high_resolution_clock::now(); +// +// for (int i = 0; i < run_times; i++) { +// PurgeMemory(UINT64_MAX /*never malloc_trim*/); +// } +// +// auto stop = std::chrono::high_resolution_clock::now(); +// auto duration = std::chrono::duration_cast(stop - start); +// +// std::cout << run_times << " times taken by PurgeMemory: " << duration.count() << " milliseconds" << std::endl; +// // 1000000 times taken by PurgeMemory: 8307 milliseconds +//} + +TEST(Common, Memory) { + auto res = PurgeMemory(UINT64_MAX /*never malloc_trim*/); + assert(res.error_code == Success); + res = PurgeMemory(0); + assert(res.error_code == Success); +} + +#endif + TEST(CApiTest, InsertTest) { auto c_collection = NewCollection(get_default_schema_config()); auto segment = NewSegment(c_collection, Growing, -1); diff --git a/internal/querynode/reduce.go b/internal/querynode/reduce.go index 09c87e3967..148f3c5a89 100644 --- a/internal/querynode/reduce.go +++ b/internal/querynode/reduce.go @@ -28,6 +28,10 @@ package querynode import "C" import ( "fmt" + + "github.com/milvus-io/milvus/internal/log" + memutil "github.com/milvus-io/milvus/internal/util/memutil" + metricsinfo "github.com/milvus-io/milvus/internal/util/metricsinfo" ) type sliceInfo struct { @@ -132,6 +136,16 @@ func getSearchResultDataBlob(cSearchResultDataBlobs searchResultDataBlobs, blobI func deleteSearchResultDataBlobs(cSearchResultDataBlobs searchResultDataBlobs) { C.DeleteSearchResultDataBlobs(cSearchResultDataBlobs) + // try to do a purgeMemory operation after DeleteSearchResultDataBlobs + usedMem := metricsinfo.GetUsedMemoryCount() + if usedMem == 0 { + log.Error("Get 0 uesdMemory when deleteSearchResultDataBlobs, which is unexpected") + return + } + maxBinsSize := uint64(float64(usedMem) * Params.CommonCfg.MemPurgeRatio) + if err := memutil.PurgeMemory(maxBinsSize); err != nil { + log.Error(err.Error()) + } } func deleteSearchResults(results []*SearchResult) { diff --git a/internal/util/memutil/purge_memory.go b/internal/util/memutil/purge_memory.go new file mode 100644 index 0000000000..adecaa022b --- /dev/null +++ b/internal/util/memutil/purge_memory.go @@ -0,0 +1,43 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package metricsinfo + +/* +#cgo CFLAGS: -I${SRCDIR}/../../core/output/include + +#cgo darwin LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_common -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../../core/output/lib" +#cgo linux LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_common -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../../core/output/lib +#cgo windows LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_common -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../../core/output/lib + +#include +#include "common/vector_index_c.h" +#include "common/memory_c.h" +*/ +import "C" +import ( + "fmt" + "unsafe" +) + +func PurgeMemory(maxBinsSize uint64) error { + cMaxBinsSize := C.uint64_t(maxBinsSize) + status := C.PurgeMemory(cMaxBinsSize) + if status.error_code == 0 { + return nil + } + defer C.free(unsafe.Pointer(status.error_msg)) + + errorMsg := string(C.GoString(status.error_msg)) + errorCode := int32(status.error_code) + + return fmt.Errorf("PurgeMemory failed, errorCode = %d, errorMsg = %s", errorCode, errorMsg) +} diff --git a/internal/util/memutil/purge_memory_test.go b/internal/util/memutil/purge_memory_test.go new file mode 100644 index 0000000000..d1839c4f22 --- /dev/null +++ b/internal/util/memutil/purge_memory_test.go @@ -0,0 +1,28 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package metricsinfo + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/util/metricsinfo" +) + +func TestPurgeMemory(t *testing.T) { + usedMem := metricsinfo.GetUsedMemoryCount() + assert.True(t, usedMem > 0) + maxBinsSize := uint64(float64(usedMem) * 0.2) + err := PurgeMemory(maxBinsSize) + assert.NoError(t, err) +} diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index a7b964c69b..b9c10a1d3e 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -132,6 +132,7 @@ type commonConfig struct { SimdType string AuthorizationEnabled bool + MemPurgeRatio float64 } func (p *commonConfig) init(base *BaseTable) { @@ -170,6 +171,7 @@ func (p *commonConfig) init(base *BaseTable) { p.initStorageType() p.initEnableAuthorization() + p.initMemoryPurgeRatio() } func (p *commonConfig) initClusterPrefix() { @@ -381,6 +383,10 @@ func (p *commonConfig) initEnableAuthorization() { p.AuthorizationEnabled = p.Base.ParseBool("common.security.authorizationEnabled", false) } +func (p *commonConfig) initMemoryPurgeRatio() { + p.MemPurgeRatio = p.Base.ParseFloatWithDefault("common.mem_purge_ratio", 0.2) +} + /////////////////////////////////////////////////////////////////////////////// // --- rootcoord --- type rootCoordConfig struct {