mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
parent
dd5078ad9d
commit
cff80c9e34
@ -20,7 +20,6 @@
|
||||
#include "db/merge/MergeTask.h"
|
||||
#include "db/snapshot/CompoundOperations.h"
|
||||
#include "db/snapshot/EventExecutor.h"
|
||||
#include "db/snapshot/IterateHandler.h"
|
||||
#include "db/snapshot/OperationExecutor.h"
|
||||
#include "db/snapshot/ResourceHelper.h"
|
||||
#include "db/snapshot/ResourceTypes.h"
|
||||
@ -38,7 +37,6 @@
|
||||
#include "segment/Utils.h"
|
||||
#include "server/ValidationUtil.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
#include <fiu/fiu-local.h>
|
||||
@ -649,46 +647,10 @@ DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_pt
|
||||
snapshot::ScopedSnapshotT ss;
|
||||
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, query_ptr->collection_id));
|
||||
|
||||
/* collect all valid segment */
|
||||
std::vector<SegmentVisitor::Ptr> segment_visitors;
|
||||
auto exec = [&](const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status {
|
||||
auto p_id = segment->GetPartitionId();
|
||||
auto p_ptr = ss->GetResource<snapshot::Partition>(p_id);
|
||||
auto& p_name = p_ptr->GetName();
|
||||
|
||||
/* check partition match pattern */
|
||||
bool match = false;
|
||||
if (query_ptr->partitions.empty()) {
|
||||
match = true;
|
||||
} else {
|
||||
for (auto& pattern : query_ptr->partitions) {
|
||||
if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
|
||||
match = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (match) {
|
||||
auto visitor = SegmentVisitor::Build(ss, segment->GetID());
|
||||
if (!visitor) {
|
||||
return Status(milvus::SS_ERROR, "Cannot build segment visitor");
|
||||
}
|
||||
segment_visitors.push_back(visitor);
|
||||
}
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
|
||||
segment_iter->Iterate();
|
||||
STATUS_CHECK(segment_iter->GetStatus());
|
||||
|
||||
LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());
|
||||
|
||||
engine::snapshot::IDS_TYPE segment_ids;
|
||||
for (auto& sv : segment_visitors) {
|
||||
segment_ids.emplace_back(sv->GetSegment()->GetID());
|
||||
}
|
||||
SnapshotVisitor ss_visitor(ss);
|
||||
snapshot::IDS_TYPE segment_ids;
|
||||
STATUS_CHECK(ss_visitor.SegmentsToSearch(query_ptr->partitions, segment_ids));
|
||||
LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_ids.size());
|
||||
|
||||
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(nullptr, ss, options_, query_ptr, segment_ids);
|
||||
|
||||
@ -760,17 +722,6 @@ DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id,
|
||||
// remove delete id from the id list
|
||||
segment::DeletedDocsPtr deleted_docs_ptr;
|
||||
segment_reader->LoadDeletedDocs(deleted_docs_ptr);
|
||||
// if (deleted_docs_ptr) {
|
||||
// const std::vector<offset_t>& delete_ids = deleted_docs_ptr->GetDeletedDocs();
|
||||
// std::vector<offset_t> temp_ids;
|
||||
// temp_ids.reserve(delete_ids.size());
|
||||
// std::copy(delete_ids.begin(), delete_ids.end(), std::back_inserter(temp_ids));
|
||||
// std::sort(temp_ids.begin(), temp_ids.end(), std::greater<>());
|
||||
// for (auto offset : temp_ids) {
|
||||
// entity_ids.erase(entity_ids.begin() + offset, entity_ids.begin() + offset + 1);
|
||||
// }
|
||||
// }
|
||||
|
||||
if (deleted_docs_ptr) {
|
||||
// sorted-merge entities id and deleted offsets
|
||||
const std::vector<offset_t>& delete_offsets = deleted_docs_ptr->GetDeletedDocs();
|
||||
@ -880,6 +831,7 @@ DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::stri
|
||||
continue; // no deleted docs, no need to compact
|
||||
}
|
||||
|
||||
// the segment row count is zero, drop it
|
||||
auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
|
||||
auto row_count = segment_commit->GetRowCount();
|
||||
if (row_count == 0) {
|
||||
@ -895,11 +847,13 @@ DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::stri
|
||||
continue;
|
||||
}
|
||||
|
||||
// delete rate less than threshold, skip compact
|
||||
auto deleted_count = deleted_docs->GetCount();
|
||||
if (double(deleted_count) / (row_count + deleted_count) < threshold) {
|
||||
continue; // no need to compact
|
||||
}
|
||||
|
||||
// compact segment, the compact action is same as merge
|
||||
snapshot::IDS_TYPE ids = {segment_id};
|
||||
MergeTask merge_task(options_, latest_ss, ids);
|
||||
status = merge_task.Execute();
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
#include "db/snapshot/Snapshot.h"
|
||||
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
|
||||
#include "segment/SegmentReader.h"
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
@ -28,13 +29,35 @@ namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
SegmentsToSearchCollector::SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss, snapshot::IDS_TYPE& segment_ids)
|
||||
: BaseT(ss), segment_ids_(segment_ids) {
|
||||
SegmentsToSearchCollector::SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss,
|
||||
const std::vector<std::string>& partitions,
|
||||
snapshot::IDS_TYPE& segment_ids)
|
||||
: BaseT(ss), partitions_(partitions), segment_ids_(segment_ids) {
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_commit) {
|
||||
segment_ids_.push_back(segment_commit->GetSegmentId());
|
||||
auto p_id = segment_commit->GetPartitionId();
|
||||
auto p_ptr = ss_->GetResource<snapshot::Partition>(p_id);
|
||||
auto& p_name = p_ptr->GetName();
|
||||
|
||||
/* check partition match pattern */
|
||||
bool match = false;
|
||||
if (partitions_.empty()) {
|
||||
match = true;
|
||||
} else {
|
||||
for (auto& pattern : partitions_) {
|
||||
if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
|
||||
match = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (match) {
|
||||
segment_ids_.push_back(segment_commit->GetSegmentId());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -30,11 +30,13 @@ namespace engine {
|
||||
struct SegmentsToSearchCollector : public snapshot::SegmentCommitIterator {
|
||||
using ResourceT = snapshot::SegmentCommit;
|
||||
using BaseT = snapshot::IterateHandler<ResourceT>;
|
||||
SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss, snapshot::IDS_TYPE& segment_ids);
|
||||
SegmentsToSearchCollector(snapshot::ScopedSnapshotT ss, const std::vector<std::string>& partitions,
|
||||
snapshot::IDS_TYPE& segment_ids);
|
||||
|
||||
Status
|
||||
Handle(const typename ResourceT::Ptr&) override;
|
||||
|
||||
std::vector<std::string> partitions_;
|
||||
snapshot::IDS_TYPE& segment_ids_;
|
||||
};
|
||||
|
||||
|
||||
@ -200,7 +200,8 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
|
||||
|
||||
// just ensure segments listed in id order
|
||||
snapshot::IDS_TYPE segment_ids;
|
||||
auto handler = std::make_shared<SegmentsToSearchCollector>(ss, segment_ids);
|
||||
std::vector<std::string> partition_tags;
|
||||
auto handler = std::make_shared<SegmentsToSearchCollector>(ss, partition_tags, segment_ids);
|
||||
handler->Iterate();
|
||||
std::sort(segment_ids.begin(), segment_ids.end());
|
||||
|
||||
|
||||
@ -33,10 +33,10 @@ SnapshotVisitor::SnapshotVisitor(snapshot::ID_TYPE collection_id) {
|
||||
}
|
||||
|
||||
Status
|
||||
SnapshotVisitor::SegmentsToSearch(snapshot::IDS_TYPE& segment_ids) {
|
||||
SnapshotVisitor::SegmentsToSearch(const std::vector<std::string>& partitions, snapshot::IDS_TYPE& segment_ids) {
|
||||
STATUS_CHECK(status_);
|
||||
|
||||
auto handler = std::make_shared<SegmentsToSearchCollector>(ss_, segment_ids);
|
||||
auto handler = std::make_shared<SegmentsToSearchCollector>(ss_, partitions, segment_ids);
|
||||
handler->Iterate();
|
||||
|
||||
return handler->GetStatus();
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
@ -28,7 +29,7 @@ class SnapshotVisitor {
|
||||
explicit SnapshotVisitor(snapshot::ID_TYPE collection_id);
|
||||
|
||||
Status
|
||||
SegmentsToSearch(snapshot::IDS_TYPE& segment_ids);
|
||||
SegmentsToSearch(const std::vector<std::string>& partitions, snapshot::IDS_TYPE& segment_ids);
|
||||
|
||||
Status
|
||||
SegmentsToIndex(const std::string& field_name, snapshot::IDS_TYPE& segment_ids, bool force_build);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user