diff --git a/internal/core/src/storage/parquet_c.cpp b/internal/core/src/storage/parquet_c.cpp index 01f954f837..7348a1c611 100644 --- a/internal/core/src/storage/parquet_c.cpp +++ b/internal/core/src/storage/parquet_c.cpp @@ -14,6 +14,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "storage/parquet_c.h" #include "storage/PayloadReader.h" #include "storage/PayloadWriter.h" @@ -23,6 +25,19 @@ using Payload = milvus::storage::Payload; using PayloadWriter = milvus::storage::PayloadWriter; using PayloadReader = milvus::storage::PayloadReader; +void +ReleaseArrowUnused() { + static std::mutex release_mutex; + + // While multiple threads are releasing memory, + // we don't need everyone do releasing, + // just let some of them do this also works well + if (release_mutex.try_lock()) { + arrow::default_memory_pool()->ReleaseUnused(); + release_mutex.unlock(); + } +} + static const char* ErrorMsg(const std::string& msg) { if (msg.empty()) @@ -174,9 +189,10 @@ GetPayloadLengthFromWriter(CPayloadWriter payloadWriter) { extern "C" void ReleasePayloadWriter(CPayloadWriter handler) { auto p = reinterpret_cast(handler); - if (p != nullptr) + if (p != nullptr) { delete p; - arrow::default_memory_pool()->ReleaseUnused(); + ReleaseArrowUnused(); + } } extern "C" CPayloadReader @@ -350,5 +366,5 @@ extern "C" void ReleasePayloadReader(CPayloadReader payloadReader) { auto p = reinterpret_cast(payloadReader); delete (p); - arrow::default_memory_pool()->ReleaseUnused(); + ReleaseArrowUnused(); }