diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 68020474ac..de92f323ba 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -1023,6 +1023,10 @@ DBImpl::Flush(const std::string& collection_id) { if (lsn != 0) { swn_wal_.Notify(); flush_req_swn_.Wait(); + } else { + // no collection flushed, call merge task to cleanup files + std::set merge_collection_ids; + StartMergeTask(merge_collection_ids); } } else { LOG_ENGINE_DEBUG_ << "MemTable flush"; @@ -1050,6 +1054,10 @@ DBImpl::Flush() { if (lsn != 0) { swn_wal_.Notify(); flush_req_swn_.Wait(); + } else { + // no collection flushed, call merge task to cleanup files + std::set merge_collection_ids; + StartMergeTask(merge_collection_ids); } } else { LOG_ENGINE_DEBUG_ << "MemTable flush"; @@ -2557,16 +2565,11 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { return max_lsn; }; - auto partition_flushed = [&](const std::string& collection_id, const std::string& partition, - const std::string& target_collection_name) { - if (options_.wal_enable_) { - uint64_t lsn = 0; - meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn); - wal_mgr_->PartitionFlushed(collection_id, partition, lsn); + auto force_flush_if_mem_full = [&]() -> uint64_t { + if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) { + LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush"; + InternalFlush(); } - - std::set merge_collection_ids = {target_collection_name}; - StartMergeTask(merge_collection_ids); }; Status status; @@ -2580,15 +2583,12 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { return status; } - std::set flushed_collections; - status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids, - (record.data_size / record.length / sizeof(float)), - (const float*)record.data, record.attr_nbytes, record.attr_data_size, - record.attr_data, record.lsn, flushed_collections); - if (!flushed_collections.empty()) { - partition_flushed(record.collection_id, record.partition_tag, target_collection_name); - } + status = mem_mgr_->InsertEntities( + target_collection_name, record.length, record.ids, (record.data_size / record.length / sizeof(float)), + (const float*)record.data, record.attr_nbytes, record.attr_data_size, record.attr_data, record.lsn); + force_flush_if_mem_full(); + // metrics milvus::server::CollectInsertMetrics metrics(record.length, status); break; } @@ -2600,14 +2600,10 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { return status; } - std::set flushed_collections; status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids, (record.data_size / record.length / sizeof(uint8_t)), - (const u_int8_t*)record.data, record.lsn, flushed_collections); - // even though !status.ok, run - if (!flushed_collections.empty()) { - partition_flushed(record.collection_id, record.partition_tag, target_collection_name); - } + (const u_int8_t*)record.data, record.lsn); + force_flush_if_mem_full(); // metrics milvus::server::CollectInsertMetrics metrics(record.length, status); @@ -2622,14 +2618,10 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { return status; } - std::set flushed_collections; status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids, (record.data_size / record.length / sizeof(float)), - (const float*)record.data, record.lsn, flushed_collections); - // even though !status.ok, run - if (!flushed_collections.empty()) { - partition_flushed(record.collection_id, record.partition_tag, target_collection_name); - } + (const float*)record.data, record.lsn); + force_flush_if_mem_full(); // metrics milvus::server::CollectInsertMetrics metrics(record.length, status); diff --git a/core/src/db/insert/MemManager.h b/core/src/db/insert/MemManager.h index aeb5354eea..53389654ca 100644 --- a/core/src/db/insert/MemManager.h +++ b/core/src/db/insert/MemManager.h @@ -27,18 +27,17 @@ class MemManager { public: virtual Status InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const float* vectors, uint64_t lsn, std::set& flushed_tables) = 0; + const float* vectors, uint64_t lsn) = 0; virtual Status InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const uint8_t* vectors, uint64_t lsn, std::set& flushed_tables) = 0; + const uint8_t* vectors, uint64_t lsn) = 0; virtual Status InsertEntities(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, const float* vectors, const std::unordered_map& attr_nbytes, const std::unordered_map& attr_size, - const std::unordered_map>& attr_data, uint64_t lsn, - std::set& flushed_tables) = 0; + const std::unordered_map>& attr_data, uint64_t lsn) = 0; virtual Status DeleteVector(const std::string& collection_id, IDNumber vector_id, uint64_t lsn) = 0; @@ -47,10 +46,11 @@ class MemManager { DeleteVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) = 0; virtual Status - Flush(const std::string& collection_id, bool apply_delete = true) = 0; + Flush(const std::string& collection_id) = 0; virtual Status - Flush(std::set& collection_ids, bool apply_delete = true) = 0; + + Flush(std::set& collection_ids) = 0; // virtual Status // Serialize(std::set& table_ids) = 0; diff --git a/core/src/db/insert/MemManagerImpl.cpp b/core/src/db/insert/MemManagerImpl.cpp index 20ab6c4d22..437a031300 100644 --- a/core/src/db/insert/MemManagerImpl.cpp +++ b/core/src/db/insert/MemManagerImpl.cpp @@ -34,17 +34,7 @@ MemManagerImpl::GetMemByTable(const std::string& collection_id) { Status MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const float* vectors, uint64_t lsn, std::set& flushed_tables) { - flushed_tables.clear(); - if (GetCurrentMem() > options_.insert_buffer_size_) { - // TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge - auto status = Flush(flushed_tables, false); - fiu_do_on("MemManagerImpl::InsertVectors_flush_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); - if (!status.ok()) { - return status; - } - } - + const float* vectors, uint64_t lsn) { VectorsData vectors_data; vectors_data.vector_count_ = length; vectors_data.float_data_.resize(length * dim); @@ -60,19 +50,7 @@ MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, Status MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const uint8_t* vectors, uint64_t lsn, std::set& flushed_tables) { - flushed_tables.clear(); - if (GetCurrentMem() > options_.insert_buffer_size_) { - LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) - << "Insert buffer size exceeds limit. Performing force flush"; - // TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge - auto status = Flush(flushed_tables, false); - if (!status.ok()) { - LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Flush fail: " << status.message(); - return status; - } - } - + const uint8_t* vectors, uint64_t lsn) { VectorsData vectors_data; vectors_data.vector_count_ = length; vectors_data.binary_data_.resize(length * dim); @@ -91,19 +69,7 @@ MemManagerImpl::InsertEntities(const std::string& collection_id, int64_t length, int64_t dim, const float* vectors, const std::unordered_map& attr_nbytes, const std::unordered_map& attr_size, - const std::unordered_map>& attr_data, uint64_t lsn, - std::set& flushed_tables) { - flushed_tables.clear(); - if (GetCurrentMem() > options_.insert_buffer_size_) { - LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) - << "Insert buffer size exceeds limit. Performing force flush"; - auto status = Flush(flushed_tables, false); - if (!status.ok()) { - LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Flush fail: " << status.message(); - return status; - } - } - + const std::unordered_map>& attr_data, uint64_t lsn) { VectorsData vectors_data; vectors_data.vector_count_ = length; vectors_data.float_data_.resize(length * dim); @@ -174,7 +140,7 @@ MemManagerImpl::DeleteVectors(const std::string& collection_id, int64_t length, } Status -MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) { +MemManagerImpl::Flush(const std::string& collection_id) { ToImmutable(collection_id); // TODO: There is actually only one memTable in the immutable list MemList temp_immutable_list; @@ -187,7 +153,7 @@ MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) { auto max_lsn = GetMaxLSN(temp_immutable_list); for (auto& mem : temp_immutable_list) { LOG_ENGINE_DEBUG_ << "Flushing collection: " << mem->GetTableId(); - auto status = mem->Serialize(max_lsn, apply_delete); + auto status = mem->Serialize(max_lsn, true); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Flush collection " << mem->GetTableId() << " failed"; return status; @@ -199,7 +165,7 @@ MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) { } Status -MemManagerImpl::Flush(std::set& collection_ids, bool apply_delete) { +MemManagerImpl::Flush(std::set& collection_ids) { ToImmutable(); MemList temp_immutable_list; @@ -213,7 +179,7 @@ MemManagerImpl::Flush(std::set& collection_ids, bool apply_delete) auto max_lsn = GetMaxLSN(temp_immutable_list); for (auto& mem : temp_immutable_list) { LOG_ENGINE_DEBUG_ << "Flushing collection: " << mem->GetTableId(); - auto status = mem->Serialize(max_lsn, apply_delete); + auto status = mem->Serialize(max_lsn, true); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Flush collection " << mem->GetTableId() << " failed"; return status; diff --git a/core/src/db/insert/MemManagerImpl.h b/core/src/db/insert/MemManagerImpl.h index 061dfe6bc7..0eb957f437 100644 --- a/core/src/db/insert/MemManagerImpl.h +++ b/core/src/db/insert/MemManagerImpl.h @@ -43,18 +43,17 @@ class MemManagerImpl : public MemManager, public server::CacheConfigHandler { Status InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const float* vectors, uint64_t lsn, std::set& flushed_tables) override; + const float* vectors, uint64_t lsn) override; Status InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, - const uint8_t* vectors, uint64_t lsn, std::set& flushed_tables) override; + const uint8_t* vectors, uint64_t lsn) override; Status InsertEntities(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim, const float* vectors, const std::unordered_map& attr_nbytes, const std::unordered_map& attr_size, - const std::unordered_map>& attr_data, uint64_t lsn, - std::set& flushed_tables) override; + const std::unordered_map>& attr_data, uint64_t lsn) override; Status DeleteVector(const std::string& collection_id, IDNumber vector_id, uint64_t lsn) override; @@ -63,10 +62,10 @@ class MemManagerImpl : public MemManager, public server::CacheConfigHandler { DeleteVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) override; Status - Flush(const std::string& collection_id, bool apply_delete = true) override; + Flush(const std::string& collection_id) override; Status - Flush(std::set& collection_ids, bool apply_delete = true) override; + Flush(std::set& collection_ids) override; // Status // Serialize(std::set& table_ids) override;