diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index cbaa2128b9..2caf9aee14 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -204,6 +204,9 @@ DBImpl::DropCollection(const std::string& collection_name) { // erase cache ClearCollectionCache(ss, options_.meta_.path_); + // clear index failed retry map of this collection + ClearIndexFailedRecord(collection_name); + return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits::max()); } @@ -344,42 +347,40 @@ DBImpl::CreateIndex(const std::shared_ptr& context, const std:: auto status = Flush(); WaitMergeFileFinish(); // let merge file thread finish - // step 2: compare old index and new index + // step 2: compare old index and new index, drop old index, set new index CollectionIndex new_index = index; CollectionIndex old_index; STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, old_index)); - if (utils::IsSameIndex(old_index, new_index)) { - return Status::OK(); // same index + if (!utils::IsSameIndex(old_index, new_index)) { + DropIndex(collection_name, field_name); + WaitMergeFileFinish(); // let merge file thread finish since DropIndex start a merge task + + // create field element for new index + status = SetSnapshotIndex(collection_name, field_name, new_index); + if (!status.ok()) { + return status; + } } - // step 3: drop old index - DropIndex(collection_name, field_name); - WaitMergeFileFinish(); // let merge file thread finish since DropIndex start a merge task + // clear index failed retry map of this collection + ClearIndexFailedRecord(collection_name); - // step 4: create field element for index - status = SetSnapshotIndex(collection_name, field_name, new_index); - if (!status.ok()) { - return status; - } - - // step 5: start background build index thread - std::vector collection_names = {collection_name}; - WaitBuildIndexFinish(); - StartBuildIndexTask(collection_names, true); - - // step 6: iterate segments need to be build index, wait until all segments are built + // step 3: iterate segments need to be build index, wait until all segments are built while (true) { + // start background build index thread + std::vector collection_names = {collection_name}; + StartBuildIndexTask(collection_names, true); + + // check if all segments are built SnapshotVisitor ss_visitor(collection_name); snapshot::IDS_TYPE segment_ids; - ss_visitor.SegmentsToIndex(field_name, segment_ids); + ss_visitor.SegmentsToIndex(field_name, segment_ids, true); if (segment_ids.empty()) { break; // all segments build index finished } - snapshot::ScopedSnapshotT ss; - STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); - IgnoreIndexFailedSegments(ss->GetCollectionId(), segment_ids); + IgnoreIndexFailedSegments(collection_name, segment_ids); if (segment_ids.empty()) { break; // some segments failed to build index, and ignored } @@ -522,10 +523,8 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_ } // do insert - int64_t segment_row_count = DEFAULT_SEGMENT_ROW_COUNT; - if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) { - segment_row_count = params[PARAM_SEGMENT_ROW_COUNT]; - } + int64_t segment_row_count = 0; + GetSegmentRowCount(ss->GetCollection(), segment_row_count); int64_t collection_id = ss->GetCollectionId(); int64_t partition_id = partition->GetID(); @@ -538,7 +537,9 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_ if (!status.ok()) { return status; } - if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) { + + std::set collection_ids; + if (mem_mgr_->RequireFlush(collection_ids)) { LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush"; InternalFlush(); } @@ -583,7 +584,20 @@ DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNum } status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), entity_ids, op_id); - return status; + if (!status.ok()) { + return status; + } + + std::set collection_ids; + if (mem_mgr_->RequireFlush(collection_ids)) { + if (collection_ids.find(ss->GetCollectionId()) != collection_ids.end()) { + LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "delete", 0) + << "Delete count in buffer exceeds limit. Force flush"; + InternalFlush(collection_name); + } + } + + return Status::OK(); } Status @@ -962,7 +976,7 @@ DBImpl::TimingMetricThread() { } void -DBImpl::StartBuildIndexTask(const std::vector& collection_names, bool reset_retry_times) { +DBImpl::StartBuildIndexTask(const std::vector& collection_names, bool force_build) { if (collection_names.empty()) { return; // no need to start thread } @@ -982,19 +996,14 @@ DBImpl::StartBuildIndexTask(const std::vector& collection_names, bo { std::lock_guard lck(index_result_mutex_); if (index_thread_results_.empty()) { - if (reset_retry_times) { - std::lock_guard lock(index_retry_mutex_); - index_retry_map_.clear(); // reset index retry times - } - index_thread_results_.push_back( - index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names)); + index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names, force_build)); } } } void -DBImpl::BackgroundBuildIndexTask(std::vector collection_names) { +DBImpl::BackgroundBuildIndexTask(std::vector collection_names, bool force_build) { SetThreadName("build_index"); std::unique_lock lock(build_index_mutex_); @@ -1008,14 +1017,13 @@ DBImpl::BackgroundBuildIndexTask(std::vector collection_names) { SnapshotVisitor ss_visitor(latest_ss); snapshot::IDS_TYPE segment_ids; - ss_visitor.SegmentsToIndex("", segment_ids); + ss_visitor.SegmentsToIndex("", segment_ids, force_build); if (segment_ids.empty()) { continue; } // check index retry times - snapshot::ID_TYPE collection_id = latest_ss->GetCollectionId(); - IgnoreIndexFailedSegments(collection_id, segment_ids); + IgnoreIndexFailedSegments(collection_name, segment_ids); if (segment_ids.empty()) { continue; } @@ -1034,7 +1042,7 @@ DBImpl::BackgroundBuildIndexTask(std::vector collection_names) { // record failed segments, avoid build index hang snapshot::IDS_TYPE& failed_ids = job->FailedSegments(); - MarkIndexFailedSegments(collection_id, failed_ids); + MarkIndexFailedSegments(collection_name, failed_ids); if (!job->status().ok()) { LOG_ENGINE_ERROR_ << job->status().message(); @@ -1177,18 +1185,18 @@ DBImpl::ConfigUpdate(const std::string& name) { } void -DBImpl::MarkIndexFailedSegments(snapshot::ID_TYPE collection_id, const snapshot::IDS_TYPE& failed_ids) { +DBImpl::MarkIndexFailedSegments(const std::string& collection_name, const snapshot::IDS_TYPE& failed_ids) { std::lock_guard lock(index_retry_mutex_); - SegmentIndexRetryMap& retry_map = index_retry_map_[collection_id]; + SegmentIndexRetryMap& retry_map = index_retry_map_[collection_name]; for (auto& id : failed_ids) { retry_map[id]++; } } void -DBImpl::IgnoreIndexFailedSegments(snapshot::ID_TYPE collection_id, snapshot::IDS_TYPE& segment_ids) { +DBImpl::IgnoreIndexFailedSegments(const std::string& collection_name, snapshot::IDS_TYPE& segment_ids) { std::lock_guard lock(index_retry_mutex_); - SegmentIndexRetryMap& retry_map = index_retry_map_[collection_id]; + SegmentIndexRetryMap& retry_map = index_retry_map_[collection_name]; snapshot::IDS_TYPE segment_ids_to_build; for (auto id : segment_ids) { if (retry_map[id] < BUILD_INEDX_RETRY_TIMES) { @@ -1198,5 +1206,11 @@ DBImpl::IgnoreIndexFailedSegments(snapshot::ID_TYPE collection_id, snapshot::IDS segment_ids.swap(segment_ids_to_build); } +void +DBImpl::ClearIndexFailedRecord(const std::string& collection_name) { + std::lock_guard lock(index_retry_mutex_); + index_retry_map_.erase(collection_name); +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index d898eb3ec4..56978abfd9 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -140,10 +140,10 @@ class DBImpl : public DB, public ConfigObserver { TimingMetricThread(); void - StartBuildIndexTask(const std::vector& collection_names, bool reset_retry_times); + StartBuildIndexTask(const std::vector& collection_names, bool force_build); void - BackgroundBuildIndexTask(std::vector collection_names); + BackgroundBuildIndexTask(std::vector collection_names, bool force_build); void TimingIndexThread(); @@ -173,10 +173,13 @@ class DBImpl : public DB, public ConfigObserver { DecreaseLiveBuildTaskNum(); void - MarkIndexFailedSegments(snapshot::ID_TYPE collection_id, const snapshot::IDS_TYPE& failed_ids); + MarkIndexFailedSegments(const std::string& collection_name, const snapshot::IDS_TYPE& failed_ids); void - IgnoreIndexFailedSegments(snapshot::ID_TYPE collection_id, snapshot::IDS_TYPE& segment_ids); + IgnoreIndexFailedSegments(const std::string& collection_name, snapshot::IDS_TYPE& segment_ids); + + void + ClearIndexFailedRecord(const std::string& collection_name); private: DBOptions options_; @@ -205,7 +208,7 @@ class DBImpl : public DB, public ConfigObserver { std::list> index_thread_results_; using SegmentIndexRetryMap = std::unordered_map; - using CollectionIndexRetryMap = std::unordered_map; + using CollectionIndexRetryMap = std::unordered_map; CollectionIndexRetryMap index_retry_map_; std::mutex index_retry_mutex_; diff --git a/core/src/db/SnapshotHandlers.cpp b/core/src/db/SnapshotHandlers.cpp index f29912fb5a..2bdf307387 100644 --- a/core/src/db/SnapshotHandlers.cpp +++ b/core/src/db/SnapshotHandlers.cpp @@ -11,7 +11,6 @@ #include "db/SnapshotHandlers.h" -#include "config/ServerConfig.h" #include "db/SnapshotUtils.h" #include "db/SnapshotVisitor.h" #include "db/Types.h" @@ -40,9 +39,8 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm /////////////////////////////////////////////////////////////////////////////// SegmentsToIndexCollector::SegmentsToIndexCollector(snapshot::ScopedSnapshotT ss, const std::string& field_name, - snapshot::IDS_TYPE& segment_ids) - : BaseT(ss), field_name_(field_name), segment_ids_(segment_ids) { - build_index_threshold_ = config.engine.build_index_threshold(); + snapshot::IDS_TYPE& segment_ids, int64_t build_index_threshold) + : BaseT(ss), field_name_(field_name), segment_ids_(segment_ids), build_index_threshold_(build_index_threshold) { } Status diff --git a/core/src/db/SnapshotHandlers.h b/core/src/db/SnapshotHandlers.h index 88aa348e5e..7586da7b9d 100644 --- a/core/src/db/SnapshotHandlers.h +++ b/core/src/db/SnapshotHandlers.h @@ -42,7 +42,7 @@ struct SegmentsToIndexCollector : public snapshot::SegmentCommitIterator { using ResourceT = snapshot::SegmentCommit; using BaseT = snapshot::IterateHandler; SegmentsToIndexCollector(snapshot::ScopedSnapshotT ss, const std::string& field_name, - snapshot::IDS_TYPE& segment_ids); + snapshot::IDS_TYPE& segment_ids, int64_t build_index_threshold); Status Handle(const typename ResourceT::Ptr&) override; diff --git a/core/src/db/SnapshotUtils.cpp b/core/src/db/SnapshotUtils.cpp index 5a27ad2ca2..b1cda0af98 100644 --- a/core/src/db/SnapshotUtils.cpp +++ b/core/src/db/SnapshotUtils.cpp @@ -266,28 +266,27 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) { Status GetSegmentRowCount(const std::string& collection_name, int64_t& segment_row_count) { - segment_row_count = DEFAULT_SEGMENT_ROW_COUNT; snapshot::ScopedSnapshotT latest_ss; STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name)); // get row count per segment auto collection = latest_ss->GetCollection(); - const json params = collection->GetParams(); - if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) { - segment_row_count = params[PARAM_SEGMENT_ROW_COUNT]; - } - - return Status::OK(); + return GetSegmentRowCount(collection, segment_row_count); } Status GetSegmentRowCount(int64_t collection_id, int64_t& segment_row_count) { - segment_row_count = DEFAULT_SEGMENT_ROW_COUNT; snapshot::ScopedSnapshotT latest_ss; STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_id)); // get row count per segment auto collection = latest_ss->GetCollection(); + return GetSegmentRowCount(collection, segment_row_count); +} + +Status +GetSegmentRowCount(const snapshot::CollectionPtr& collection, int64_t& segment_row_count) { + segment_row_count = DEFAULT_SEGMENT_ROW_COUNT; const json params = collection->GetParams(); if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) { segment_row_count = params[PARAM_SEGMENT_ROW_COUNT]; @@ -343,5 +342,18 @@ ClearIndexCache(snapshot::ScopedSnapshotT& ss, const std::string& dir_root, cons return Status::OK(); } +Status +DropSegment(snapshot::ScopedSnapshotT& ss, snapshot::ID_TYPE segment_id) { + snapshot::OperationContext drop_seg_context; + auto segment = ss->GetResource(segment_id); + if (segment == nullptr) { + return Status(DB_ERROR, "Invalid segment id"); + } + + drop_seg_context.prev_segment = segment; + auto drop_op = std::make_shared(drop_seg_context, ss); + return drop_op->Push(); +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/SnapshotUtils.h b/core/src/db/SnapshotUtils.h index f9dfc177b2..191b947f30 100644 --- a/core/src/db/SnapshotUtils.h +++ b/core/src/db/SnapshotUtils.h @@ -59,6 +59,9 @@ GetSegmentRowCount(const std::string& collection_name, int64_t& segment_row_coun Status GetSegmentRowCount(int64_t collection_id, int64_t& segment_row_count); +Status +GetSegmentRowCount(const snapshot::CollectionPtr& collection, int64_t& segment_row_count); + Status ClearCollectionCache(snapshot::ScopedSnapshotT& ss, const std::string& dir_root); @@ -68,5 +71,8 @@ ClearPartitionCache(snapshot::ScopedSnapshotT& ss, const std::string& dir_root, Status ClearIndexCache(snapshot::ScopedSnapshotT& ss, const std::string& dir_root, const std::string& field_name); +Status +DropSegment(snapshot::ScopedSnapshotT& ss, snapshot::ID_TYPE segment_id); + } // namespace engine } // namespace milvus diff --git a/core/src/db/SnapshotVisitor.cpp b/core/src/db/SnapshotVisitor.cpp index d218867b7e..d9acc70067 100644 --- a/core/src/db/SnapshotVisitor.cpp +++ b/core/src/db/SnapshotVisitor.cpp @@ -10,12 +10,14 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/SnapshotVisitor.h" -#include - +#include "config/ServerConfig.h" #include "db/SnapshotHandlers.h" +#include "db/SnapshotUtils.h" #include "db/Types.h" #include "db/snapshot/Snapshots.h" +#include + namespace milvus { namespace engine { @@ -41,10 +43,19 @@ SnapshotVisitor::SegmentsToSearch(snapshot::IDS_TYPE& segment_ids) { } Status -SnapshotVisitor::SegmentsToIndex(const std::string& field_name, snapshot::IDS_TYPE& segment_ids) { +SnapshotVisitor::SegmentsToIndex(const std::string& field_name, snapshot::IDS_TYPE& segment_ids, bool force_build) { STATUS_CHECK(status_); - auto handler = std::make_shared(ss_, field_name, segment_ids); + // force_build means client invoke create_index, + // all segments whose row_count greater than config.build_index_threshold will be counted in. + // else, only the segments whose row_count greater than segment_row_count will be counted in + int64_t build_index_threshold = config.engine.build_index_threshold.value; + if (!force_build) { + auto collection = ss_->GetCollection(); + GetSegmentRowCount(collection, build_index_threshold); + } + + auto handler = std::make_shared(ss_, field_name, segment_ids, build_index_threshold); handler->Iterate(); return handler->GetStatus(); diff --git a/core/src/db/SnapshotVisitor.h b/core/src/db/SnapshotVisitor.h index 2135650a91..5a37240f29 100644 --- a/core/src/db/SnapshotVisitor.h +++ b/core/src/db/SnapshotVisitor.h @@ -31,7 +31,7 @@ class SnapshotVisitor { SegmentsToSearch(snapshot::IDS_TYPE& segment_ids); Status - SegmentsToIndex(const std::string& field_name, snapshot::IDS_TYPE& segment_ids); + SegmentsToIndex(const std::string& field_name, snapshot::IDS_TYPE& segment_ids, bool force_build); protected: snapshot::ScopedSnapshotT ss_; diff --git a/core/src/db/insert/MemCollection.cpp b/core/src/db/insert/MemCollection.cpp index 937641c9d3..8067335d96 100644 --- a/core/src/db/insert/MemCollection.cpp +++ b/core/src/db/insert/MemCollection.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -102,6 +103,11 @@ MemCollection::Delete(const std::vector& ids, idx_t op_id) { return Status::OK(); } +size_t +MemCollection::DeleteCount() const { + return ids_to_delete_.size(); +} + Status MemCollection::EraseMem(int64_t partition_id) { std::lock_guard lock(mem_mutex_); @@ -132,6 +138,7 @@ MemCollection::Serialize() { break; } } + recorder.RecordSection("ApplyDeleteToFile"); // serialize mem to new segment files // delete ids will be applied in MemSegment::Serialize() method @@ -168,116 +175,73 @@ MemCollection::ApplyDeleteToFile() { int64_t segment_iterated = 0; auto segment_executor = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* iterator) -> Status { + TimeRecorder recorder("MemCollection::ApplyDeleteToFile collection " + std::to_string(collection_id_) + + " segment " + std::to_string(segment->GetID())); segment_iterated++; auto seg_visitor = engine::SegmentVisitor::Build(ss, segment->GetID()); segment::SegmentReaderPtr segment_reader = std::make_shared(options_.meta_.path_, seg_visitor); - // Step 1: Check delete_id in mem - std::set ids_to_check; - { - segment::IdBloomFilterPtr pre_bloom_filter; - STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter)); - for (auto& id : ids_to_delete_) { - if (pre_bloom_filter->Check(id)) { - ids_to_check.insert(id); - } + // Step 1: Check to-delete id possibly in this segment + std::unordered_set ids_to_check; + segment::IdBloomFilterPtr pre_bloom_filter; + STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter)); + for (auto& id : ids_to_delete_) { + if (pre_bloom_filter->Check(id)) { + ids_to_check.insert(id); } } if (ids_to_check.empty()) { - return Status::OK(); + return Status::OK(); // nothing change for this segment } + // load entity ids std::vector uids; STATUS_CHECK(segment_reader->LoadUids(uids)); - // Step 2: Mark previous deleted docs file and bloom filter file stale - auto& field_visitors_map = seg_visitor->GetFieldVisitors(); - auto uid_field_visitor = seg_visitor->GetFieldVisitor(engine::FIELD_UID); - auto del_doc_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS); - auto del_docs_element = del_doc_visitor->GetElement(); - auto blm_filter_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER); - auto blm_filter_element = blm_filter_visitor->GetElement(); - - auto segment_file_executor = [&](const snapshot::SegmentFilePtr& segment_file, - snapshot::SegmentFileIterator* iterator) -> Status { - if (segment_file->GetSegmentId() == segment->GetID() && - (segment_file->GetFieldElementId() == del_docs_element->GetID() || - segment_file->GetFieldElementId() == blm_filter_element->GetID())) { - segments_op->AddStaleSegmentFile(segment_file); + // Load previous deleted offsets + segment::DeletedDocsPtr prev_del_docs; + STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs)); + std::unordered_set del_offsets; + if (prev_del_docs) { + auto prev_del_offsets = prev_del_docs->GetDeletedDocs(); + for (auto offset : prev_del_offsets) { + del_offsets.insert(offset); } + } + uint64_t prev_del_count = del_offsets.size(); - return Status::OK(); - }; + // if the to-delete id is actually in this segment, remove it from bloom filter, and record its offset + segment::IdBloomFilterPtr bloom_filter; + pre_bloom_filter->Clone(bloom_filter); - auto segment_file_iterator = std::make_shared(ss, segment_file_executor); - segment_file_iterator->Iterate(); - STATUS_CHECK(segment_file_iterator->GetStatus()); - - // Step 3: Create new deleted docs file and bloom filter file - snapshot::SegmentFileContext del_file_context; - del_file_context.field_name = uid_field_visitor->GetField()->GetName(); - del_file_context.field_element_name = del_docs_element->GetName(); - del_file_context.collection_id = segment->GetCollectionId(); - del_file_context.partition_id = segment->GetPartitionId(); - del_file_context.segment_id = segment->GetID(); - snapshot::SegmentFilePtr delete_file; - STATUS_CHECK(segments_op->CommitNewSegmentFile(del_file_context, delete_file)); - - std::string collection_root_path = options_.meta_.path_ + COLLECTIONS_FOLDER; - auto segment_writer = std::make_shared(options_.meta_.path_, seg_visitor); - - std::string del_docs_path = snapshot::GetResPath(collection_root_path, delete_file); - - snapshot::SegmentFileContext bloom_file_context; - bloom_file_context.field_name = uid_field_visitor->GetField()->GetName(); - bloom_file_context.field_element_name = blm_filter_element->GetName(); - bloom_file_context.collection_id = segment->GetCollectionId(); - bloom_file_context.partition_id = segment->GetPartitionId(); - bloom_file_context.segment_id = segment->GetID(); - - engine::snapshot::SegmentFile::Ptr bloom_filter_file; - STATUS_CHECK(segments_op->CommitNewSegmentFile(bloom_file_context, bloom_filter_file)); - - std::string bloom_filter_file_path = - snapshot::GetResPath(collection_root_path, bloom_filter_file); - - // Step 4: update delete docs and bloom filter - { - // Load previous delete_id and merge into 'delete_ids' - segment::DeletedDocsPtr prev_del_docs; - STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs)); - std::vector pre_del_offsets; - if (prev_del_docs) { - pre_del_offsets = prev_del_docs->GetDeletedDocs(); - for (auto& offset : pre_del_offsets) { - ids_to_check.insert(uids[offset]); - } + for (size_t i = 0; i < uids.size(); i++) { + auto id = uids[i]; + if (ids_to_check.find(id) != ids_to_check.end()) { + del_offsets.insert(i); + bloom_filter->Remove(id); } - - segment::IdBloomFilterPtr bloom_filter = std::make_shared(uids.size()); - std::vector delete_docs_offset; - for (size_t i = 0; i < uids.size(); i++) { - if (std::binary_search(ids_to_check.begin(), ids_to_check.end(), uids[i])) { - delete_docs_offset.emplace_back(i); - } else { - bloom_filter->Add(uids[i]); - } - } - - STATUS_CHECK(segments_op->CommitRowCountDelta(segment->GetID(), - delete_docs_offset.size() - pre_del_offsets.size(), true)); - - auto delete_docs = std::make_shared(delete_docs_offset); - STATUS_CHECK(segment_writer->WriteDeletedDocs(del_docs_path, delete_docs)); - STATUS_CHECK(segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter)); } - delete_file->SetSize(CommonUtil::GetFileSize(del_docs_path + codec::DeletedDocsFormat::FilePostfix())); - bloom_filter_file->SetSize( - CommonUtil::GetFileSize(bloom_filter_file_path + codec::IdBloomFilterFormat::FilePostfix())); + uint64_t new_deleted = del_offsets.size() - prev_del_count; + if (new_deleted == 0) { + return Status::OK(); // nothing change for this segment + } + recorder.RecordSection("detect deleted " + std::to_string(new_deleted) + " entities"); + + // Step 3: + // all entities have been deleted? drop this segment + if (del_offsets.size() == uids.size()) { + return DropSegment(ss, segment->GetID()); + } + + // create new deleted docs file and bloom filter file + STATUS_CHECK( + CreateDeletedDocsBloomFilter(segments_op, ss, seg_visitor, del_offsets, new_deleted, bloom_filter)); + + recorder.RecordSection("write deleted docs and bloom filter"); return Status::OK(); }; @@ -293,6 +257,82 @@ MemCollection::ApplyDeleteToFile() { return segments_op->Push(); } +Status +MemCollection::CreateDeletedDocsBloomFilter(const std::shared_ptr& segments_op, + const snapshot::ScopedSnapshotT& ss, engine::SegmentVisitorPtr& seg_visitor, + const std::unordered_set& del_offsets, + uint64_t new_deleted, segment::IdBloomFilterPtr& bloom_filter) { + // Step 1: Mark previous deleted docs file and bloom filter file stale + const snapshot::SegmentPtr& segment = seg_visitor->GetSegment(); + auto& field_visitors_map = seg_visitor->GetFieldVisitors(); + auto uid_field_visitor = seg_visitor->GetFieldVisitor(engine::FIELD_UID); + auto del_doc_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS); + auto del_docs_element = del_doc_visitor->GetElement(); + auto blm_filter_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER); + auto blm_filter_element = blm_filter_visitor->GetElement(); + + auto segment_file_executor = [&](const snapshot::SegmentFilePtr& segment_file, + snapshot::SegmentFileIterator* iterator) -> Status { + if (segment_file->GetSegmentId() == segment->GetID() && + (segment_file->GetFieldElementId() == del_docs_element->GetID() || + segment_file->GetFieldElementId() == blm_filter_element->GetID())) { + segments_op->AddStaleSegmentFile(segment_file); + } + + return Status::OK(); + }; + + auto segment_file_iterator = std::make_shared(ss, segment_file_executor); + segment_file_iterator->Iterate(); + STATUS_CHECK(segment_file_iterator->GetStatus()); + + // Step 2: Create new deleted docs file and bloom filter file + snapshot::SegmentFileContext del_file_context; + del_file_context.field_name = uid_field_visitor->GetField()->GetName(); + del_file_context.field_element_name = del_docs_element->GetName(); + del_file_context.collection_id = segment->GetCollectionId(); + del_file_context.partition_id = segment->GetPartitionId(); + del_file_context.segment_id = segment->GetID(); + snapshot::SegmentFilePtr delete_file; + STATUS_CHECK(segments_op->CommitNewSegmentFile(del_file_context, delete_file)); + + std::string collection_root_path = options_.meta_.path_ + COLLECTIONS_FOLDER; + auto segment_writer = std::make_shared(options_.meta_.path_, seg_visitor); + + std::string del_docs_path = snapshot::GetResPath(collection_root_path, delete_file); + + snapshot::SegmentFileContext bloom_file_context; + bloom_file_context.field_name = uid_field_visitor->GetField()->GetName(); + bloom_file_context.field_element_name = blm_filter_element->GetName(); + bloom_file_context.collection_id = segment->GetCollectionId(); + bloom_file_context.partition_id = segment->GetPartitionId(); + bloom_file_context.segment_id = segment->GetID(); + + engine::snapshot::SegmentFile::Ptr bloom_filter_file; + STATUS_CHECK(segments_op->CommitNewSegmentFile(bloom_file_context, bloom_filter_file)); + + std::string bloom_filter_file_path = + snapshot::GetResPath(collection_root_path, bloom_filter_file); + + // Step 3: update delete docs and bloom filter + STATUS_CHECK(segments_op->CommitRowCountDelta(segment->GetID(), new_deleted, true)); + + std::vector vec_del_offsets; + vec_del_offsets.reserve(del_offsets.size()); + for (auto offset : del_offsets) { + vec_del_offsets.push_back(offset); + } + auto delete_docs = std::make_shared(vec_del_offsets); + STATUS_CHECK(segment_writer->WriteDeletedDocs(del_docs_path, delete_docs)); + STATUS_CHECK(segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter)); + + delete_file->SetSize(CommonUtil::GetFileSize(del_docs_path + codec::DeletedDocsFormat::FilePostfix())); + bloom_filter_file->SetSize( + CommonUtil::GetFileSize(bloom_filter_file_path + codec::IdBloomFilterFormat::FilePostfix())); + + return Status::OK(); +} + int64_t MemCollection::GetCollectionId() const { return collection_id_; diff --git a/core/src/db/insert/MemCollection.h b/core/src/db/insert/MemCollection.h index f4046e924a..0215842768 100644 --- a/core/src/db/insert/MemCollection.h +++ b/core/src/db/insert/MemCollection.h @@ -22,7 +22,9 @@ #include #include "config/ConfigMgr.h" +#include "db/SnapshotVisitor.h" #include "db/insert/MemSegment.h" +#include "db/snapshot/Snapshots.h" #include "utils/Status.h" namespace milvus { @@ -43,6 +45,9 @@ class MemCollection { Status Delete(const std::vector& ids, idx_t op_id); + size_t + DeleteCount() const; + Status EraseMem(int64_t partition_id); @@ -59,6 +64,12 @@ class MemCollection { Status ApplyDeleteToFile(); + Status + CreateDeletedDocsBloomFilter(const std::shared_ptr& segments_op, + const snapshot::ScopedSnapshotT& ss, engine::SegmentVisitorPtr& seg_visitor, + const std::unordered_set& del_offsets, uint64_t new_deleted, + segment::IdBloomFilterPtr& bloom_filter); + private: int64_t collection_id_ = 0; DBOptions options_; diff --git a/core/src/db/insert/MemManager.h b/core/src/db/insert/MemManager.h index 3e48607672..55e3b6622e 100644 --- a/core/src/db/insert/MemManager.h +++ b/core/src/db/insert/MemManager.h @@ -44,14 +44,8 @@ class MemManager { virtual Status EraseMem(int64_t collection_id, int64_t partition_id) = 0; - virtual size_t - GetCurrentMutableMem() = 0; - - virtual size_t - GetCurrentImmutableMem() = 0; - - virtual size_t - GetCurrentMem() = 0; + virtual bool + RequireFlush(std::set& collection_ids) = 0; }; using MemManagerPtr = std::shared_ptr; diff --git a/core/src/db/insert/MemManagerImpl.cpp b/core/src/db/insert/MemManagerImpl.cpp index 54942ba05b..afbfc764d7 100644 --- a/core/src/db/insert/MemManagerImpl.cpp +++ b/core/src/db/insert/MemManagerImpl.cpp @@ -281,6 +281,20 @@ MemManagerImpl::EraseMem(int64_t collection_id, int64_t partition_id) { return Status::OK(); } +bool +MemManagerImpl::RequireFlush(std::set& collection_ids) { + bool require_flush = false; + if (GetCurrentMem() > options_.insert_buffer_size_) { + std::lock_guard lock(mem_mutex_); + for (auto& kv : mem_map_) { + collection_ids.insert(kv.first); + } + require_flush = true; + } + + return require_flush; +} + size_t MemManagerImpl::GetCurrentMutableMem() { size_t total_mem = 0; diff --git a/core/src/db/insert/MemManagerImpl.h b/core/src/db/insert/MemManagerImpl.h index ef744487f2..4ff1ab34a0 100644 --- a/core/src/db/insert/MemManagerImpl.h +++ b/core/src/db/insert/MemManagerImpl.h @@ -56,16 +56,19 @@ class MemManagerImpl : public MemManager { Status EraseMem(int64_t collection_id, int64_t partition_id) override; - size_t - GetCurrentMutableMem() override; - - size_t - GetCurrentImmutableMem() override; - - size_t - GetCurrentMem() override; + bool + RequireFlush(std::set& collection_ids) override; private: + size_t + GetCurrentMutableMem(); + + size_t + GetCurrentImmutableMem(); + + size_t + GetCurrentMem(); + MemCollectionPtr GetMemByCollection(int64_t collection_id); diff --git a/core/src/db/insert/MemSegment.cpp b/core/src/db/insert/MemSegment.cpp index 176f7ff2b3..83e8606610 100644 --- a/core/src/db/insert/MemSegment.cpp +++ b/core/src/db/insert/MemSegment.cpp @@ -59,6 +59,18 @@ MemSegment::Delete(const std::vector& ids, idx_t op_id) { return Status::OK(); } + // previous action is delete? combine delete action + if (!actions_.empty()) { + MemAction& pre_action = *actions_.rbegin(); + if (!pre_action.delete_ids_.empty()) { + for (auto& id : ids) { + pre_action.delete_ids_.insert(id); + } + return Status::OK(); + } + } + + // create new action MemAction action; action.op_id_ = op_id; for (auto& id : ids) { diff --git a/core/src/db/merge/MergeManagerImpl.cpp b/core/src/db/merge/MergeManagerImpl.cpp index e2eedbcb79..39be3ad38b 100644 --- a/core/src/db/merge/MergeManagerImpl.cpp +++ b/core/src/db/merge/MergeManagerImpl.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/merge/MergeManagerImpl.h" +#include "db/SnapshotUtils.h" #include "db/merge/MergeLayerStrategy.h" #include "db/merge/MergeSimpleStrategy.h" #include "db/merge/MergeTask.h" @@ -79,11 +80,8 @@ MergeManagerImpl::MergeSegments(int64_t collection_id, MergeStrategyType type) { // get row count per segment auto collection = latest_ss->GetCollection(); - int64_t row_count_per_segment = DEFAULT_SEGMENT_ROW_COUNT; - const json params = collection->GetParams(); - if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) { - row_count_per_segment = params[PARAM_SEGMENT_ROW_COUNT]; - } + int64_t row_count_per_segment = 0; + GetSegmentRowCount(collection, row_count_per_segment); // distribute segments to groups by some strategy SegmentGroups segment_groups; diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index 65103d0f0a..baa4815270 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -13,6 +13,7 @@ #include "db/Utils.h" #include "db/wal/WalOperationCodec.h" #include "utils/CommonUtil.h" +#include "utils/Log.h" #include #include @@ -180,6 +181,8 @@ Status WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) { WaitCleanupFinish(); + LOG_ENGINE_DEBUG_ << "Begin wal recovery"; + try { using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; DirectoryIterator iter_outer(wal_path_); @@ -240,6 +243,8 @@ WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) { return Status(DB_ERROR, msg); } + LOG_ENGINE_DEBUG_ << "Wal recovery finished"; + return Status::OK(); } diff --git a/core/src/segment/Segment.cpp b/core/src/segment/Segment.cpp index fa6dfb12b3..0db2aeef8a 100644 --- a/core/src/segment/Segment.cpp +++ b/core/src/segment/Segment.cpp @@ -19,6 +19,7 @@ #include "db/SnapshotUtils.h" #include "db/snapshot/Snapshots.h" #include "knowhere/index/vector_index/helpers/IndexParameter.h" +#include "segment/Utils.h" #include "utils/Log.h" #include @@ -300,8 +301,11 @@ Segment::DeleteEntity(std::vector& offsets) { if (offsets.size() == 0) { return Status::OK(); } - // sort offset in descendant - std::sort(offsets.begin(), offsets.end(), std::greater<>()); + + // calculate copy ranges + int64_t delete_count = 0; + segment::CopyRanges copy_ranges; + segment::CalcCopyRangesWithOffset(offsets, row_count_, copy_ranges, delete_count); // delete entity data from max offset to min offset for (auto& pair : fixed_fields_) { @@ -311,20 +315,14 @@ Segment::DeleteEntity(std::vector& offsets) { } auto& data = pair.second; - for (auto offset : offsets) { - if (offset >= 0 && offset < row_count_) { - auto step = offset * width; - data->data_.erase(data->data_.begin() + step, data->data_.begin() + step + width); - } - } + + std::vector new_data; + segment::CopyDataWithRanges(data->data_, width, copy_ranges, new_data); + data->data_.swap(new_data); } // reset row count - for (auto offset : offsets) { - if (offset >= 0 && offset < row_count_) { - row_count_--; - } - } + row_count_ -= delete_count; return Status::OK(); } diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index cd365a7482..1e560da870 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -237,7 +237,7 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) { LOG_ENGINE_DEBUG_ << "Merging from " << segment_reader->GetSegmentPath() << " to " << GetSegmentPath(); - TimeRecorderAuto recorder("SegmentWriter::Merge"); + TimeRecorder recorder("SegmentWriter::Merge"); // load raw data // After load fields, the data has been cached in segment. @@ -259,6 +259,8 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) { return status; } + recorder.RecordSection("load data"); + // the source segment may be used in search, we can't change its data, so copy a new segment for merging engine::SegmentPtr duplicated_segment = std::make_shared(); src_segment->CopyOutRawData(duplicated_segment); @@ -267,6 +269,8 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) { duplicated_segment->DeleteEntity(delete_ids); } + recorder.RecordSection("delete entities"); + // convert to DataChunk engine::DataChunkPtr chunk = std::make_shared(); duplicated_segment->ShareToChunkData(chunk); @@ -280,6 +284,8 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) { // clear cache of merged segment segment_reader->ClearCache(); + recorder.ElapseFromBegin("done"); + // Note: no need to merge bloom filter, the bloom filter will be created during serialize return Status::OK(); diff --git a/core/src/segment/Utils.cpp b/core/src/segment/Utils.cpp new file mode 100644 index 0000000000..7c181b800c --- /dev/null +++ b/core/src/segment/Utils.cpp @@ -0,0 +1,104 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "segment/Utils.h" + +#include +#include +#include +#include + +#include "utils/Log.h" + +namespace milvus { +namespace segment { + +bool +CalcCopyRangesWithOffset(const std::vector& offsets, int64_t row_count, CopyRanges& copy_ranges, + int64_t& delete_count) { + copy_ranges.clear(); + if (offsets.empty() || row_count <= 0) { + return false; + } + + // arrange offsets + std::set new_offsets; + for (auto offset : offsets) { + if (offset < 0 || offset >= row_count) { + continue; + } + new_offsets.insert(offset); + } + delete_count = new_offsets.size(); + if (delete_count == 0) { + return true; + } + + // if the first offset is not zero, add a range [0, first] + int32_t first = *new_offsets.begin(); + if (first > 0) { + copy_ranges.push_back(std::make_pair(0, first)); + } + + // calculate inner range + int32_t prev = *new_offsets.begin(); + for (auto offset : new_offsets) { + if (offset - prev == 1) { + prev = offset; + continue; + } else { + if (prev != offset) { + copy_ranges.push_back(std::make_pair(prev + 1, offset)); + } + } + prev = offset; + } + + // if the last offset is not the last row, add a range [last + 1, row_count] + int32_t last = *new_offsets.rbegin(); + if (last < row_count - 1) { + copy_ranges.push_back(std::make_pair(last + 1, row_count)); + } + + return true; +} + +bool +CopyDataWithRanges(const std::vector& src_data, int64_t row_width, const CopyRanges& copy_ranges, + std::vector& target_data) { + target_data.clear(); + if (src_data.empty() || copy_ranges.empty() || row_width <= 0) { + return false; + } + + // calculate result bytes + int64_t bytes = 0; + for (auto& pair : copy_ranges) { + if (pair.second <= pair.first) { + continue; + } + bytes += (pair.second - pair.first) * row_width; + } + target_data.resize(bytes); + + // copy data to result + size_t poz = 0; + for (auto& pair : copy_ranges) { + size_t len = (pair.second - pair.first) * row_width; + memcpy(target_data.data() + poz, src_data.data() + pair.first * row_width, len); + poz += len; + } + + return true; +} + +} // namespace segment +} // namespace milvus diff --git a/core/src/segment/Utils.h b/core/src/segment/Utils.h new file mode 100644 index 0000000000..96e8c5bf4d --- /dev/null +++ b/core/src/segment/Utils.h @@ -0,0 +1,53 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace milvus { +namespace segment { + +using CopyRanges = std::vector>; + +// calculate copy range according to deleted offsets +// for example: +// segment row count is 100, the deleted offsets is: {1,2,3, 6, 9,10} +// the copy ranges will be: +// { +// {0, 1} +// {4, 6} +// {7, 9} +// {11, 100} +// } +bool +CalcCopyRangesWithOffset(const std::vector& offsets, int64_t row_count, CopyRanges& copy_ranges, + int64_t& delete_count); + +// copy data from source data according to copy ranges +// for example: +// each row_with is 8 bytes +// src_data has 100 rows, means 800 bytes +// the copy ranges is: +// { +// {0, 10} +// {50, 90} +// } +// then the target_data will have (10 - 0) * 8 + (90 - 50) * 8 = 400 bytes copied from src_data +bool +CopyDataWithRanges(const std::vector& src_data, int64_t row_width, const CopyRanges& copy_ranges, + std::vector& target_data); + +} // namespace segment +} // namespace milvus diff --git a/core/src/server/ValidationUtil.cpp b/core/src/server/ValidationUtil.cpp index 1deb2f5e5b..bcf9c6ee57 100644 --- a/core/src/server/ValidationUtil.cpp +++ b/core/src/server/ValidationUtil.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "server/ValidationUtil.h" +#include "config/ServerConfig.h" #include "db/Constants.h" #include "db/Utils.h" #include "knowhere/index/vector_index/ConfAdapter.h" @@ -343,9 +344,11 @@ ValidateIndexParams(const milvus::json& index_params, int64_t dimension, const s Status ValidateSegmentRowCount(int64_t segment_row_count) { - if (segment_row_count <= 0 || segment_row_count > engine::MAX_SEGMENT_ROW_COUNT) { + int64_t min = config.engine.build_index_threshold.value; + int max = engine::MAX_SEGMENT_ROW_COUNT; + if (segment_row_count < min || segment_row_count > max) { std::string msg = "Invalid segment row count: " + std::to_string(segment_row_count) + ". " + - "Should be in range 1 ~ " + std::to_string(engine::MAX_SEGMENT_ROW_COUNT) + "."; + "Should be in range " + std::to_string(min) + " ~ " + std::to_string(max) + "."; LOG_SERVER_ERROR_ << msg; return Status(SERVER_INVALID_SEGMENT_ROW_COUNT, msg); } diff --git a/core/unittest/db/test_segment.cpp b/core/unittest/db/test_segment.cpp index 9fac340fb9..ce3454797b 100644 --- a/core/unittest/db/test_segment.cpp +++ b/core/unittest/db/test_segment.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -27,6 +28,7 @@ #include "segment/SegmentReader.h" #include "segment/SegmentWriter.h" #include "segment/IdBloomFilter.h" +#include "segment/Utils.h" #include "storage/disk/DiskIOReader.h" #include "storage/disk/DiskIOWriter.h" #include "utils/Json.h" @@ -46,20 +48,29 @@ CreateCollection(std::shared_ptr db, const std::string& collection_name, con int64_t collection_id = 0; int64_t field_id = 0; /* field uid */ - auto uid_field = std::make_shared(milvus::engine::FIELD_UID, 0, - milvus::engine::DataType::INT64, milvus::engine::snapshot::JEmpty, field_id); - auto uid_field_element_blt = std::make_shared(collection_id, field_id, - milvus::engine::ELEMENT_BLOOM_FILTER, milvus::engine::FieldElementType::FET_BLOOM_FILTER); - auto uid_field_element_del = std::make_shared(collection_id, field_id, - milvus::engine::ELEMENT_DELETED_DOCS, milvus::engine::FieldElementType::FET_DELETED_DOCS); + auto uid_field = std::make_shared(milvus::engine::FIELD_UID, + 0, + milvus::engine::DataType::INT64, + milvus::engine::snapshot::JEmpty, + field_id); + auto uid_field_element_blt = std::make_shared(collection_id, + field_id, + milvus::engine::ELEMENT_BLOOM_FILTER, + milvus::engine::FieldElementType::FET_BLOOM_FILTER); + auto uid_field_element_del = std::make_shared(collection_id, + field_id, + milvus::engine::ELEMENT_DELETED_DOCS, + milvus::engine::FieldElementType::FET_DELETED_DOCS); field_id++; /* field vector */ milvus::json vector_param = {{milvus::knowhere::meta::DIM, 4}}; auto vector_field = std::make_shared("vector", 0, milvus::engine::DataType::VECTOR_FLOAT, vector_param, - field_id); - auto vector_field_element_index = std::make_shared(collection_id, field_id, - milvus::knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, milvus::engine::FieldElementType::FET_INDEX); + field_id); + auto vector_field_element_index = std::make_shared(collection_id, + field_id, + milvus::knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, + milvus::engine::FieldElementType::FET_INDEX); context.fields_schema[uid_field] = {uid_field_element_blt, uid_field_element_del}; context.fields_schema[vector_field] = {vector_field_element_index}; @@ -189,7 +200,7 @@ TEST(BloomFilterTest, ReadWriteTest) { } double error_rate = filter.ErrorRate(); - double wrong_rate = (double)wrong_check/id_count; + double wrong_rate = (double)wrong_check / id_count; ASSERT_LT(wrong_rate, error_rate); }; @@ -203,7 +214,7 @@ TEST(BloomFilterTest, ReadWriteTest) { } double error_rate = filter.ErrorRate(); - double wrong_rate = (double)wrong_check/id_count; + double wrong_rate = (double)wrong_check / id_count; ASSERT_LT(wrong_rate, error_rate); }; @@ -310,7 +321,7 @@ TEST(BloomFilterTest, CloneTest) { } double error_rate = filter->ErrorRate(); - double wrong_rate = (double)wrong_check/id_count; + double wrong_rate = (double)wrong_check / id_count; ASSERT_LT(wrong_rate, error_rate); }; @@ -322,3 +333,132 @@ TEST(BloomFilterTest, CloneTest) { error_rate_check(clone_filter, removed_id_array); } + +TEST(SegmentUtilTest, CalcCopyRangeTest) { + // invalid input test + std::vector offsets; + int64_t row_count = 0, delete_count = 0; + milvus::segment::CopyRanges copy_ranges; + bool res = milvus::segment::CalcCopyRangesWithOffset(offsets, row_count, copy_ranges, delete_count); + ASSERT_FALSE(res); + + row_count = 100; + + auto compare_result = + [&](const std::vector& offsets, const milvus::segment::CopyRanges& compare_range) -> void { + milvus::segment::CopyRanges copy_ranges; + res = milvus::segment::CalcCopyRangesWithOffset(offsets, row_count, copy_ranges, delete_count); + ASSERT_TRUE(res); + + int64_t compare_count = 0; + for (auto offset : offsets) { + if (offset >= 0 && offset < row_count) { + compare_count++; + } + } + + ASSERT_EQ(delete_count, compare_count); + ASSERT_EQ(copy_ranges.size(), compare_range.size()); + for (size_t i = 0; i < copy_ranges.size(); ++i) { + ASSERT_EQ(copy_ranges[i], compare_range[i]); + } + }; + + { + offsets = {0, 1, 2, 99, 100}; + milvus::segment::CopyRanges compare = { + {3, 99} + }; + compare_result(offsets, compare); + } + + { + offsets = {-1, 5, 4, 3, 90, 91}; + milvus::segment::CopyRanges compare = { + {0, 3}, + {6, 90}, + {92, 100}, + }; + compare_result(offsets, compare); + } +} + +TEST(SegmentUtilTest, CopyRangeDataTest) { + auto compare_result = [&](std::vector& src_data, + std::vector& offsets, + int64_t row_count, + int64_t row_width) -> void { + int64_t delete_count = 0; + milvus::segment::CopyRanges copy_ranges; + auto res = milvus::segment::CalcCopyRangesWithOffset(offsets, row_count, copy_ranges, delete_count); + ASSERT_TRUE(res); + + if (copy_ranges.empty()) { + return; + } + + std::vector target_data; + res = milvus::segment::CopyDataWithRanges(src_data, row_width, copy_ranges, target_data); + ASSERT_TRUE(res); + + // erase element from the largest offset + std::vector compare_data = src_data; + std::set arrange_offsets; + for (auto offset : offsets) { + if (offset >= 0 && offset < row_count) { + arrange_offsets.insert(offset); + } + } + + for (auto iter = arrange_offsets.rbegin(); iter != arrange_offsets.rend(); ++iter) { + auto step = (*iter) * row_width; + compare_data.erase(compare_data.begin() + step, compare_data.begin() + step + row_width); + } + ASSERT_EQ(target_data, compare_data); + }; + + // invalid input test + std::vector offsets; + std::vector src_data; + int64_t row_width = 0; + milvus::segment::CopyRanges copy_ranges; + std::vector target_data; + bool res = milvus::segment::CopyDataWithRanges(src_data, row_width, copy_ranges, target_data); + ASSERT_FALSE(res); + + // construct source data + row_width = 64; + int64_t row_count = 100; + src_data.resize(row_count * row_width); + for (int64_t i = 0; i < row_count * row_width; ++i) { + src_data[i] = i % 255; + } + { + offsets = {0, 1, 2, 99, 100}; + compare_result(src_data, offsets, row_count, row_width); + } + + { + offsets = {-1, 5, 4, 3, 90, 91}; + compare_result(src_data, offsets, row_count, row_width); + } + + // random test + for (int32_t i = 0; i < 10; ++i) { + std::default_random_engine random; + row_count = random() % 100 + 1; + row_width = random() % 8 + 8; + + src_data.resize(row_count * row_width); + for (int64_t i = 0; i < row_count * row_width; ++i) { + src_data[i] = i % 255; + } + + int64_t offset_count = (row_count > 1) ? (random() % row_count + 1) : 1; + offsets.resize(offset_count); + for (int64_t k = 0; k < offset_count; ++k) { + offsets[k] = (random() % row_count) + ((k % 2 == 0) ? 2 : -2); + } + compare_result(src_data, offsets, row_count, row_width); + } +} diff --git a/tests/milvus_python_test/utils.py b/tests/milvus_python_test/utils.py index eb7690ca0a..1a5b5dde60 100644 --- a/tests/milvus_python_test/utils.py +++ b/tests/milvus_python_test/utils.py @@ -468,10 +468,9 @@ def add_vector_field(nb, dimension=dimension): def gen_segment_row_counts(): sizes = [ - 1, - 2, - 1024, - 4096 + 4096, + 8192, + 1000000, ] return sizes