fix delete entity performance issue (#3667)

* fix delete entity performance issue

Signed-off-by: groot <yihua.mo@zilliz.com>

* force flush for delete

Signed-off-by: groot <yihua.mo@zilliz.com>

* improve delete performance

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix delete performce issue

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix hang

Signed-off-by: groot <yihua.mo@zilliz.com>

* segment row count min limit

Signed-off-by: groot <yihua.mo@zilliz.com>

* merge 0.11.0

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix python test

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix python test failed

Signed-off-by: groot <yihua.mo@zilliz.com>

* add log

Signed-off-by: groot <yihua.mo@zilliz.com>
This commit is contained in:
groot 2020-09-14 16:38:25 +08:00 committed by GitHub
parent bd2a3d6e63
commit 7e1345c28f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 633 additions and 209 deletions

View File

@ -204,6 +204,9 @@ DBImpl::DropCollection(const std::string& collection_name) {
// erase cache
ClearCollectionCache(ss, options_.meta_.path_);
// clear index failed retry map of this collection
ClearIndexFailedRecord(collection_name);
return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
}
@ -344,42 +347,40 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
auto status = Flush();
WaitMergeFileFinish(); // let merge file thread finish
// step 2: compare old index and new index
// step 2: compare old index and new index, drop old index, set new index
CollectionIndex new_index = index;
CollectionIndex old_index;
STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, old_index));
if (utils::IsSameIndex(old_index, new_index)) {
return Status::OK(); // same index
if (!utils::IsSameIndex(old_index, new_index)) {
DropIndex(collection_name, field_name);
WaitMergeFileFinish(); // let merge file thread finish since DropIndex start a merge task
// create field element for new index
status = SetSnapshotIndex(collection_name, field_name, new_index);
if (!status.ok()) {
return status;
}
}
// step 3: drop old index
DropIndex(collection_name, field_name);
WaitMergeFileFinish(); // let merge file thread finish since DropIndex start a merge task
// clear index failed retry map of this collection
ClearIndexFailedRecord(collection_name);
// step 4: create field element for index
status = SetSnapshotIndex(collection_name, field_name, new_index);
if (!status.ok()) {
return status;
}
// step 5: start background build index thread
std::vector<std::string> collection_names = {collection_name};
WaitBuildIndexFinish();
StartBuildIndexTask(collection_names, true);
// step 6: iterate segments need to be build index, wait until all segments are built
// step 3: iterate segments need to be build index, wait until all segments are built
while (true) {
// start background build index thread
std::vector<std::string> collection_names = {collection_name};
StartBuildIndexTask(collection_names, true);
// check if all segments are built
SnapshotVisitor ss_visitor(collection_name);
snapshot::IDS_TYPE segment_ids;
ss_visitor.SegmentsToIndex(field_name, segment_ids);
ss_visitor.SegmentsToIndex(field_name, segment_ids, true);
if (segment_ids.empty()) {
break; // all segments build index finished
}
snapshot::ScopedSnapshotT ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
IgnoreIndexFailedSegments(ss->GetCollectionId(), segment_ids);
IgnoreIndexFailedSegments(collection_name, segment_ids);
if (segment_ids.empty()) {
break; // some segments failed to build index, and ignored
}
@ -522,10 +523,8 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
}
// do insert
int64_t segment_row_count = DEFAULT_SEGMENT_ROW_COUNT;
if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) {
segment_row_count = params[PARAM_SEGMENT_ROW_COUNT];
}
int64_t segment_row_count = 0;
GetSegmentRowCount(ss->GetCollection(), segment_row_count);
int64_t collection_id = ss->GetCollectionId();
int64_t partition_id = partition->GetID();
@ -538,7 +537,9 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
if (!status.ok()) {
return status;
}
if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
std::set<int64_t> collection_ids;
if (mem_mgr_->RequireFlush(collection_ids)) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
InternalFlush();
}
@ -583,7 +584,20 @@ DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNum
}
status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), entity_ids, op_id);
return status;
if (!status.ok()) {
return status;
}
std::set<int64_t> collection_ids;
if (mem_mgr_->RequireFlush(collection_ids)) {
if (collection_ids.find(ss->GetCollectionId()) != collection_ids.end()) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "delete", 0)
<< "Delete count in buffer exceeds limit. Force flush";
InternalFlush(collection_name);
}
}
return Status::OK();
}
Status
@ -962,7 +976,7 @@ DBImpl::TimingMetricThread() {
}
void
DBImpl::StartBuildIndexTask(const std::vector<std::string>& collection_names, bool reset_retry_times) {
DBImpl::StartBuildIndexTask(const std::vector<std::string>& collection_names, bool force_build) {
if (collection_names.empty()) {
return; // no need to start thread
}
@ -982,19 +996,14 @@ DBImpl::StartBuildIndexTask(const std::vector<std::string>& collection_names, bo
{
std::lock_guard<std::mutex> lck(index_result_mutex_);
if (index_thread_results_.empty()) {
if (reset_retry_times) {
std::lock_guard<std::mutex> lock(index_retry_mutex_);
index_retry_map_.clear(); // reset index retry times
}
index_thread_results_.push_back(
index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names));
index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names, force_build));
}
}
}
void
DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names, bool force_build) {
SetThreadName("build_index");
std::unique_lock<std::mutex> lock(build_index_mutex_);
@ -1008,14 +1017,13 @@ DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
SnapshotVisitor ss_visitor(latest_ss);
snapshot::IDS_TYPE segment_ids;
ss_visitor.SegmentsToIndex("", segment_ids);
ss_visitor.SegmentsToIndex("", segment_ids, force_build);
if (segment_ids.empty()) {
continue;
}
// check index retry times
snapshot::ID_TYPE collection_id = latest_ss->GetCollectionId();
IgnoreIndexFailedSegments(collection_id, segment_ids);
IgnoreIndexFailedSegments(collection_name, segment_ids);
if (segment_ids.empty()) {
continue;
}
@ -1034,7 +1042,7 @@ DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
// record failed segments, avoid build index hang
snapshot::IDS_TYPE& failed_ids = job->FailedSegments();
MarkIndexFailedSegments(collection_id, failed_ids);
MarkIndexFailedSegments(collection_name, failed_ids);
if (!job->status().ok()) {
LOG_ENGINE_ERROR_ << job->status().message();
@ -1177,18 +1185,18 @@ DBImpl::ConfigUpdate(const std::string& name) {
}
void
DBImpl::MarkIndexFailedSegments(snapshot::ID_TYPE collection_id, const snapshot::IDS_TYPE& failed_ids) {
DBImpl::MarkIndexFailedSegments(const std::string& collection_name, const snapshot::IDS_TYPE& failed_ids) {
std::lock_guard<std::mutex> lock(index_retry_mutex_);
SegmentIndexRetryMap& retry_map = index_retry_map_[collection_id];
SegmentIndexRetryMap& retry_map = index_retry_map_[collection_name];
for (auto& id : failed_ids) {
retry_map[id]++;
}
}
void
DBImpl::IgnoreIndexFailedSegments(snapshot::ID_TYPE collection_id, snapshot::IDS_TYPE& segment_ids) {
DBImpl::IgnoreIndexFailedSegments(const std::string& collection_name, snapshot::IDS_TYPE& segment_ids) {
std::lock_guard<std::mutex> lock(index_retry_mutex_);
SegmentIndexRetryMap& retry_map = index_retry_map_[collection_id];
SegmentIndexRetryMap& retry_map = index_retry_map_[collection_name];
snapshot::IDS_TYPE segment_ids_to_build;
for (auto id : segment_ids) {
if (retry_map[id] < BUILD_INEDX_RETRY_TIMES) {
@ -1198,5 +1206,11 @@ DBImpl::IgnoreIndexFailedSegments(snapshot::ID_TYPE collection_id, snapshot::IDS
segment_ids.swap(segment_ids_to_build);
}
void
DBImpl::ClearIndexFailedRecord(const std::string& collection_name) {
std::lock_guard<std::mutex> lock(index_retry_mutex_);
index_retry_map_.erase(collection_name);
}
} // namespace engine
} // namespace milvus

View File

@ -140,10 +140,10 @@ class DBImpl : public DB, public ConfigObserver {
TimingMetricThread();
void
StartBuildIndexTask(const std::vector<std::string>& collection_names, bool reset_retry_times);
StartBuildIndexTask(const std::vector<std::string>& collection_names, bool force_build);
void
BackgroundBuildIndexTask(std::vector<std::string> collection_names);
BackgroundBuildIndexTask(std::vector<std::string> collection_names, bool force_build);
void
TimingIndexThread();
@ -173,10 +173,13 @@ class DBImpl : public DB, public ConfigObserver {
DecreaseLiveBuildTaskNum();
void
MarkIndexFailedSegments(snapshot::ID_TYPE collection_id, const snapshot::IDS_TYPE& failed_ids);
MarkIndexFailedSegments(const std::string& collection_name, const snapshot::IDS_TYPE& failed_ids);
void
IgnoreIndexFailedSegments(snapshot::ID_TYPE collection_id, snapshot::IDS_TYPE& segment_ids);
IgnoreIndexFailedSegments(const std::string& collection_name, snapshot::IDS_TYPE& segment_ids);
void
ClearIndexFailedRecord(const std::string& collection_name);
private:
DBOptions options_;
@ -205,7 +208,7 @@ class DBImpl : public DB, public ConfigObserver {
std::list<std::future<void>> index_thread_results_;
using SegmentIndexRetryMap = std::unordered_map<snapshot::ID_TYPE, int64_t>;
using CollectionIndexRetryMap = std::unordered_map<snapshot::ID_TYPE, SegmentIndexRetryMap>;
using CollectionIndexRetryMap = std::unordered_map<std::string, SegmentIndexRetryMap>;
CollectionIndexRetryMap index_retry_map_;
std::mutex index_retry_mutex_;

View File

@ -11,7 +11,6 @@
#include "db/SnapshotHandlers.h"
#include "config/ServerConfig.h"
#include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h"
#include "db/Types.h"
@ -40,9 +39,8 @@ SegmentsToSearchCollector::Handle(const snapshot::SegmentCommitPtr& segment_comm
///////////////////////////////////////////////////////////////////////////////
SegmentsToIndexCollector::SegmentsToIndexCollector(snapshot::ScopedSnapshotT ss, const std::string& field_name,
snapshot::IDS_TYPE& segment_ids)
: BaseT(ss), field_name_(field_name), segment_ids_(segment_ids) {
build_index_threshold_ = config.engine.build_index_threshold();
snapshot::IDS_TYPE& segment_ids, int64_t build_index_threshold)
: BaseT(ss), field_name_(field_name), segment_ids_(segment_ids), build_index_threshold_(build_index_threshold) {
}
Status

View File

@ -42,7 +42,7 @@ struct SegmentsToIndexCollector : public snapshot::SegmentCommitIterator {
using ResourceT = snapshot::SegmentCommit;
using BaseT = snapshot::IterateHandler<ResourceT>;
SegmentsToIndexCollector(snapshot::ScopedSnapshotT ss, const std::string& field_name,
snapshot::IDS_TYPE& segment_ids);
snapshot::IDS_TYPE& segment_ids, int64_t build_index_threshold);
Status
Handle(const typename ResourceT::Ptr&) override;

View File

@ -266,28 +266,27 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
Status
GetSegmentRowCount(const std::string& collection_name, int64_t& segment_row_count) {
segment_row_count = DEFAULT_SEGMENT_ROW_COUNT;
snapshot::ScopedSnapshotT latest_ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name));
// get row count per segment
auto collection = latest_ss->GetCollection();
const json params = collection->GetParams();
if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) {
segment_row_count = params[PARAM_SEGMENT_ROW_COUNT];
}
return Status::OK();
return GetSegmentRowCount(collection, segment_row_count);
}
Status
GetSegmentRowCount(int64_t collection_id, int64_t& segment_row_count) {
segment_row_count = DEFAULT_SEGMENT_ROW_COUNT;
snapshot::ScopedSnapshotT latest_ss;
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_id));
// get row count per segment
auto collection = latest_ss->GetCollection();
return GetSegmentRowCount(collection, segment_row_count);
}
Status
GetSegmentRowCount(const snapshot::CollectionPtr& collection, int64_t& segment_row_count) {
segment_row_count = DEFAULT_SEGMENT_ROW_COUNT;
const json params = collection->GetParams();
if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) {
segment_row_count = params[PARAM_SEGMENT_ROW_COUNT];
@ -343,5 +342,18 @@ ClearIndexCache(snapshot::ScopedSnapshotT& ss, const std::string& dir_root, cons
return Status::OK();
}
Status
DropSegment(snapshot::ScopedSnapshotT& ss, snapshot::ID_TYPE segment_id) {
snapshot::OperationContext drop_seg_context;
auto segment = ss->GetResource<snapshot::Segment>(segment_id);
if (segment == nullptr) {
return Status(DB_ERROR, "Invalid segment id");
}
drop_seg_context.prev_segment = segment;
auto drop_op = std::make_shared<snapshot::DropSegmentOperation>(drop_seg_context, ss);
return drop_op->Push();
}
} // namespace engine
} // namespace milvus

View File

@ -59,6 +59,9 @@ GetSegmentRowCount(const std::string& collection_name, int64_t& segment_row_coun
Status
GetSegmentRowCount(int64_t collection_id, int64_t& segment_row_count);
Status
GetSegmentRowCount(const snapshot::CollectionPtr& collection, int64_t& segment_row_count);
Status
ClearCollectionCache(snapshot::ScopedSnapshotT& ss, const std::string& dir_root);
@ -68,5 +71,8 @@ ClearPartitionCache(snapshot::ScopedSnapshotT& ss, const std::string& dir_root,
Status
ClearIndexCache(snapshot::ScopedSnapshotT& ss, const std::string& dir_root, const std::string& field_name);
Status
DropSegment(snapshot::ScopedSnapshotT& ss, snapshot::ID_TYPE segment_id);
} // namespace engine
} // namespace milvus

View File

@ -10,12 +10,14 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/SnapshotVisitor.h"
#include <sstream>
#include "config/ServerConfig.h"
#include "db/SnapshotHandlers.h"
#include "db/SnapshotUtils.h"
#include "db/Types.h"
#include "db/snapshot/Snapshots.h"
#include <sstream>
namespace milvus {
namespace engine {
@ -41,10 +43,19 @@ SnapshotVisitor::SegmentsToSearch(snapshot::IDS_TYPE& segment_ids) {
}
Status
SnapshotVisitor::SegmentsToIndex(const std::string& field_name, snapshot::IDS_TYPE& segment_ids) {
SnapshotVisitor::SegmentsToIndex(const std::string& field_name, snapshot::IDS_TYPE& segment_ids, bool force_build) {
STATUS_CHECK(status_);
auto handler = std::make_shared<SegmentsToIndexCollector>(ss_, field_name, segment_ids);
// force_build means client invoke create_index,
// all segments whose row_count greater than config.build_index_threshold will be counted in.
// else, only the segments whose row_count greater than segment_row_count will be counted in
int64_t build_index_threshold = config.engine.build_index_threshold.value;
if (!force_build) {
auto collection = ss_->GetCollection();
GetSegmentRowCount(collection, build_index_threshold);
}
auto handler = std::make_shared<SegmentsToIndexCollector>(ss_, field_name, segment_ids, build_index_threshold);
handler->Iterate();
return handler->GetStatus();

View File

@ -31,7 +31,7 @@ class SnapshotVisitor {
SegmentsToSearch(snapshot::IDS_TYPE& segment_ids);
Status
SegmentsToIndex(const std::string& field_name, snapshot::IDS_TYPE& segment_ids);
SegmentsToIndex(const std::string& field_name, snapshot::IDS_TYPE& segment_ids, bool force_build);
protected:
snapshot::ScopedSnapshotT ss_;

View File

@ -19,6 +19,7 @@
#include <ctime>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <fiu/fiu-local.h>
@ -102,6 +103,11 @@ MemCollection::Delete(const std::vector<idx_t>& ids, idx_t op_id) {
return Status::OK();
}
size_t
MemCollection::DeleteCount() const {
return ids_to_delete_.size();
}
Status
MemCollection::EraseMem(int64_t partition_id) {
std::lock_guard<std::mutex> lock(mem_mutex_);
@ -132,6 +138,7 @@ MemCollection::Serialize() {
break;
}
}
recorder.RecordSection("ApplyDeleteToFile");
// serialize mem to new segment files
// delete ids will be applied in MemSegment::Serialize() method
@ -168,116 +175,73 @@ MemCollection::ApplyDeleteToFile() {
int64_t segment_iterated = 0;
auto segment_executor = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* iterator) -> Status {
TimeRecorder recorder("MemCollection::ApplyDeleteToFile collection " + std::to_string(collection_id_) +
" segment " + std::to_string(segment->GetID()));
segment_iterated++;
auto seg_visitor = engine::SegmentVisitor::Build(ss, segment->GetID());
segment::SegmentReaderPtr segment_reader =
std::make_shared<segment::SegmentReader>(options_.meta_.path_, seg_visitor);
// Step 1: Check delete_id in mem
std::set<idx_t> ids_to_check;
{
segment::IdBloomFilterPtr pre_bloom_filter;
STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter));
for (auto& id : ids_to_delete_) {
if (pre_bloom_filter->Check(id)) {
ids_to_check.insert(id);
}
// Step 1: Check to-delete id possibly in this segment
std::unordered_set<idx_t> ids_to_check;
segment::IdBloomFilterPtr pre_bloom_filter;
STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter));
for (auto& id : ids_to_delete_) {
if (pre_bloom_filter->Check(id)) {
ids_to_check.insert(id);
}
}
if (ids_to_check.empty()) {
return Status::OK();
return Status::OK(); // nothing change for this segment
}
// load entity ids
std::vector<engine::idx_t> uids;
STATUS_CHECK(segment_reader->LoadUids(uids));
// Step 2: Mark previous deleted docs file and bloom filter file stale
auto& field_visitors_map = seg_visitor->GetFieldVisitors();
auto uid_field_visitor = seg_visitor->GetFieldVisitor(engine::FIELD_UID);
auto del_doc_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS);
auto del_docs_element = del_doc_visitor->GetElement();
auto blm_filter_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
auto blm_filter_element = blm_filter_visitor->GetElement();
auto segment_file_executor = [&](const snapshot::SegmentFilePtr& segment_file,
snapshot::SegmentFileIterator* iterator) -> Status {
if (segment_file->GetSegmentId() == segment->GetID() &&
(segment_file->GetFieldElementId() == del_docs_element->GetID() ||
segment_file->GetFieldElementId() == blm_filter_element->GetID())) {
segments_op->AddStaleSegmentFile(segment_file);
// Load previous deleted offsets
segment::DeletedDocsPtr prev_del_docs;
STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs));
std::unordered_set<engine::offset_t> del_offsets;
if (prev_del_docs) {
auto prev_del_offsets = prev_del_docs->GetDeletedDocs();
for (auto offset : prev_del_offsets) {
del_offsets.insert(offset);
}
}
uint64_t prev_del_count = del_offsets.size();
return Status::OK();
};
// if the to-delete id is actually in this segment, remove it from bloom filter, and record its offset
segment::IdBloomFilterPtr bloom_filter;
pre_bloom_filter->Clone(bloom_filter);
auto segment_file_iterator = std::make_shared<snapshot::SegmentFileIterator>(ss, segment_file_executor);
segment_file_iterator->Iterate();
STATUS_CHECK(segment_file_iterator->GetStatus());
// Step 3: Create new deleted docs file and bloom filter file
snapshot::SegmentFileContext del_file_context;
del_file_context.field_name = uid_field_visitor->GetField()->GetName();
del_file_context.field_element_name = del_docs_element->GetName();
del_file_context.collection_id = segment->GetCollectionId();
del_file_context.partition_id = segment->GetPartitionId();
del_file_context.segment_id = segment->GetID();
snapshot::SegmentFilePtr delete_file;
STATUS_CHECK(segments_op->CommitNewSegmentFile(del_file_context, delete_file));
std::string collection_root_path = options_.meta_.path_ + COLLECTIONS_FOLDER;
auto segment_writer = std::make_shared<segment::SegmentWriter>(options_.meta_.path_, seg_visitor);
std::string del_docs_path = snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, delete_file);
snapshot::SegmentFileContext bloom_file_context;
bloom_file_context.field_name = uid_field_visitor->GetField()->GetName();
bloom_file_context.field_element_name = blm_filter_element->GetName();
bloom_file_context.collection_id = segment->GetCollectionId();
bloom_file_context.partition_id = segment->GetPartitionId();
bloom_file_context.segment_id = segment->GetID();
engine::snapshot::SegmentFile::Ptr bloom_filter_file;
STATUS_CHECK(segments_op->CommitNewSegmentFile(bloom_file_context, bloom_filter_file));
std::string bloom_filter_file_path =
snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, bloom_filter_file);
// Step 4: update delete docs and bloom filter
{
// Load previous delete_id and merge into 'delete_ids'
segment::DeletedDocsPtr prev_del_docs;
STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs));
std::vector<engine::offset_t> pre_del_offsets;
if (prev_del_docs) {
pre_del_offsets = prev_del_docs->GetDeletedDocs();
for (auto& offset : pre_del_offsets) {
ids_to_check.insert(uids[offset]);
}
for (size_t i = 0; i < uids.size(); i++) {
auto id = uids[i];
if (ids_to_check.find(id) != ids_to_check.end()) {
del_offsets.insert(i);
bloom_filter->Remove(id);
}
segment::IdBloomFilterPtr bloom_filter = std::make_shared<segment::IdBloomFilter>(uids.size());
std::vector<engine::offset_t> delete_docs_offset;
for (size_t i = 0; i < uids.size(); i++) {
if (std::binary_search(ids_to_check.begin(), ids_to_check.end(), uids[i])) {
delete_docs_offset.emplace_back(i);
} else {
bloom_filter->Add(uids[i]);
}
}
STATUS_CHECK(segments_op->CommitRowCountDelta(segment->GetID(),
delete_docs_offset.size() - pre_del_offsets.size(), true));
auto delete_docs = std::make_shared<segment::DeletedDocs>(delete_docs_offset);
STATUS_CHECK(segment_writer->WriteDeletedDocs(del_docs_path, delete_docs));
STATUS_CHECK(segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter));
}
delete_file->SetSize(CommonUtil::GetFileSize(del_docs_path + codec::DeletedDocsFormat::FilePostfix()));
bloom_filter_file->SetSize(
CommonUtil::GetFileSize(bloom_filter_file_path + codec::IdBloomFilterFormat::FilePostfix()));
uint64_t new_deleted = del_offsets.size() - prev_del_count;
if (new_deleted == 0) {
return Status::OK(); // nothing change for this segment
}
recorder.RecordSection("detect deleted " + std::to_string(new_deleted) + " entities");
// Step 3:
// all entities have been deleted? drop this segment
if (del_offsets.size() == uids.size()) {
return DropSegment(ss, segment->GetID());
}
// create new deleted docs file and bloom filter file
STATUS_CHECK(
CreateDeletedDocsBloomFilter(segments_op, ss, seg_visitor, del_offsets, new_deleted, bloom_filter));
recorder.RecordSection("write deleted docs and bloom filter");
return Status::OK();
};
@ -293,6 +257,82 @@ MemCollection::ApplyDeleteToFile() {
return segments_op->Push();
}
Status
MemCollection::CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::CompoundSegmentsOperation>& segments_op,
const snapshot::ScopedSnapshotT& ss, engine::SegmentVisitorPtr& seg_visitor,
const std::unordered_set<engine::offset_t>& del_offsets,
uint64_t new_deleted, segment::IdBloomFilterPtr& bloom_filter) {
// Step 1: Mark previous deleted docs file and bloom filter file stale
const snapshot::SegmentPtr& segment = seg_visitor->GetSegment();
auto& field_visitors_map = seg_visitor->GetFieldVisitors();
auto uid_field_visitor = seg_visitor->GetFieldVisitor(engine::FIELD_UID);
auto del_doc_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS);
auto del_docs_element = del_doc_visitor->GetElement();
auto blm_filter_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER);
auto blm_filter_element = blm_filter_visitor->GetElement();
auto segment_file_executor = [&](const snapshot::SegmentFilePtr& segment_file,
snapshot::SegmentFileIterator* iterator) -> Status {
if (segment_file->GetSegmentId() == segment->GetID() &&
(segment_file->GetFieldElementId() == del_docs_element->GetID() ||
segment_file->GetFieldElementId() == blm_filter_element->GetID())) {
segments_op->AddStaleSegmentFile(segment_file);
}
return Status::OK();
};
auto segment_file_iterator = std::make_shared<snapshot::SegmentFileIterator>(ss, segment_file_executor);
segment_file_iterator->Iterate();
STATUS_CHECK(segment_file_iterator->GetStatus());
// Step 2: Create new deleted docs file and bloom filter file
snapshot::SegmentFileContext del_file_context;
del_file_context.field_name = uid_field_visitor->GetField()->GetName();
del_file_context.field_element_name = del_docs_element->GetName();
del_file_context.collection_id = segment->GetCollectionId();
del_file_context.partition_id = segment->GetPartitionId();
del_file_context.segment_id = segment->GetID();
snapshot::SegmentFilePtr delete_file;
STATUS_CHECK(segments_op->CommitNewSegmentFile(del_file_context, delete_file));
std::string collection_root_path = options_.meta_.path_ + COLLECTIONS_FOLDER;
auto segment_writer = std::make_shared<segment::SegmentWriter>(options_.meta_.path_, seg_visitor);
std::string del_docs_path = snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, delete_file);
snapshot::SegmentFileContext bloom_file_context;
bloom_file_context.field_name = uid_field_visitor->GetField()->GetName();
bloom_file_context.field_element_name = blm_filter_element->GetName();
bloom_file_context.collection_id = segment->GetCollectionId();
bloom_file_context.partition_id = segment->GetPartitionId();
bloom_file_context.segment_id = segment->GetID();
engine::snapshot::SegmentFile::Ptr bloom_filter_file;
STATUS_CHECK(segments_op->CommitNewSegmentFile(bloom_file_context, bloom_filter_file));
std::string bloom_filter_file_path =
snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, bloom_filter_file);
// Step 3: update delete docs and bloom filter
STATUS_CHECK(segments_op->CommitRowCountDelta(segment->GetID(), new_deleted, true));
std::vector<int32_t> vec_del_offsets;
vec_del_offsets.reserve(del_offsets.size());
for (auto offset : del_offsets) {
vec_del_offsets.push_back(offset);
}
auto delete_docs = std::make_shared<segment::DeletedDocs>(vec_del_offsets);
STATUS_CHECK(segment_writer->WriteDeletedDocs(del_docs_path, delete_docs));
STATUS_CHECK(segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter));
delete_file->SetSize(CommonUtil::GetFileSize(del_docs_path + codec::DeletedDocsFormat::FilePostfix()));
bloom_filter_file->SetSize(
CommonUtil::GetFileSize(bloom_filter_file_path + codec::IdBloomFilterFormat::FilePostfix()));
return Status::OK();
}
int64_t
MemCollection::GetCollectionId() const {
return collection_id_;

View File

@ -22,7 +22,9 @@
#include <vector>
#include "config/ConfigMgr.h"
#include "db/SnapshotVisitor.h"
#include "db/insert/MemSegment.h"
#include "db/snapshot/Snapshots.h"
#include "utils/Status.h"
namespace milvus {
@ -43,6 +45,9 @@ class MemCollection {
Status
Delete(const std::vector<idx_t>& ids, idx_t op_id);
size_t
DeleteCount() const;
Status
EraseMem(int64_t partition_id);
@ -59,6 +64,12 @@ class MemCollection {
Status
ApplyDeleteToFile();
Status
CreateDeletedDocsBloomFilter(const std::shared_ptr<snapshot::CompoundSegmentsOperation>& segments_op,
const snapshot::ScopedSnapshotT& ss, engine::SegmentVisitorPtr& seg_visitor,
const std::unordered_set<engine::offset_t>& del_offsets, uint64_t new_deleted,
segment::IdBloomFilterPtr& bloom_filter);
private:
int64_t collection_id_ = 0;
DBOptions options_;

View File

@ -44,14 +44,8 @@ class MemManager {
virtual Status
EraseMem(int64_t collection_id, int64_t partition_id) = 0;
virtual size_t
GetCurrentMutableMem() = 0;
virtual size_t
GetCurrentImmutableMem() = 0;
virtual size_t
GetCurrentMem() = 0;
virtual bool
RequireFlush(std::set<int64_t>& collection_ids) = 0;
};
using MemManagerPtr = std::shared_ptr<MemManager>;

View File

@ -281,6 +281,20 @@ MemManagerImpl::EraseMem(int64_t collection_id, int64_t partition_id) {
return Status::OK();
}
bool
MemManagerImpl::RequireFlush(std::set<int64_t>& collection_ids) {
bool require_flush = false;
if (GetCurrentMem() > options_.insert_buffer_size_) {
std::lock_guard<std::mutex> lock(mem_mutex_);
for (auto& kv : mem_map_) {
collection_ids.insert(kv.first);
}
require_flush = true;
}
return require_flush;
}
size_t
MemManagerImpl::GetCurrentMutableMem() {
size_t total_mem = 0;

View File

@ -56,16 +56,19 @@ class MemManagerImpl : public MemManager {
Status
EraseMem(int64_t collection_id, int64_t partition_id) override;
size_t
GetCurrentMutableMem() override;
size_t
GetCurrentImmutableMem() override;
size_t
GetCurrentMem() override;
bool
RequireFlush(std::set<int64_t>& collection_ids) override;
private:
size_t
GetCurrentMutableMem();
size_t
GetCurrentImmutableMem();
size_t
GetCurrentMem();
MemCollectionPtr
GetMemByCollection(int64_t collection_id);

View File

@ -59,6 +59,18 @@ MemSegment::Delete(const std::vector<idx_t>& ids, idx_t op_id) {
return Status::OK();
}
// previous action is delete? combine delete action
if (!actions_.empty()) {
MemAction& pre_action = *actions_.rbegin();
if (!pre_action.delete_ids_.empty()) {
for (auto& id : ids) {
pre_action.delete_ids_.insert(id);
}
return Status::OK();
}
}
// create new action
MemAction action;
action.op_id_ = op_id;
for (auto& id : ids) {

View File

@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeManagerImpl.h"
#include "db/SnapshotUtils.h"
#include "db/merge/MergeLayerStrategy.h"
#include "db/merge/MergeSimpleStrategy.h"
#include "db/merge/MergeTask.h"
@ -79,11 +80,8 @@ MergeManagerImpl::MergeSegments(int64_t collection_id, MergeStrategyType type) {
// get row count per segment
auto collection = latest_ss->GetCollection();
int64_t row_count_per_segment = DEFAULT_SEGMENT_ROW_COUNT;
const json params = collection->GetParams();
if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) {
row_count_per_segment = params[PARAM_SEGMENT_ROW_COUNT];
}
int64_t row_count_per_segment = 0;
GetSegmentRowCount(collection, row_count_per_segment);
// distribute segments to groups by some strategy
SegmentGroups segment_groups;

View File

@ -13,6 +13,7 @@
#include "db/Utils.h"
#include "db/wal/WalOperationCodec.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include <map>
#include <memory>
@ -180,6 +181,8 @@ Status
WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) {
WaitCleanupFinish();
LOG_ENGINE_DEBUG_ << "Begin wal recovery";
try {
using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator;
DirectoryIterator iter_outer(wal_path_);
@ -240,6 +243,8 @@ WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) {
return Status(DB_ERROR, msg);
}
LOG_ENGINE_DEBUG_ << "Wal recovery finished";
return Status::OK();
}

View File

@ -19,6 +19,7 @@
#include "db/SnapshotUtils.h"
#include "db/snapshot/Snapshots.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "segment/Utils.h"
#include "utils/Log.h"
#include <algorithm>
@ -300,8 +301,11 @@ Segment::DeleteEntity(std::vector<offset_t>& offsets) {
if (offsets.size() == 0) {
return Status::OK();
}
// sort offset in descendant
std::sort(offsets.begin(), offsets.end(), std::greater<>());
// calculate copy ranges
int64_t delete_count = 0;
segment::CopyRanges copy_ranges;
segment::CalcCopyRangesWithOffset(offsets, row_count_, copy_ranges, delete_count);
// delete entity data from max offset to min offset
for (auto& pair : fixed_fields_) {
@ -311,20 +315,14 @@ Segment::DeleteEntity(std::vector<offset_t>& offsets) {
}
auto& data = pair.second;
for (auto offset : offsets) {
if (offset >= 0 && offset < row_count_) {
auto step = offset * width;
data->data_.erase(data->data_.begin() + step, data->data_.begin() + step + width);
}
}
std::vector<uint8_t> new_data;
segment::CopyDataWithRanges(data->data_, width, copy_ranges, new_data);
data->data_.swap(new_data);
}
// reset row count
for (auto offset : offsets) {
if (offset >= 0 && offset < row_count_) {
row_count_--;
}
}
row_count_ -= delete_count;
return Status::OK();
}

View File

@ -237,7 +237,7 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) {
LOG_ENGINE_DEBUG_ << "Merging from " << segment_reader->GetSegmentPath() << " to " << GetSegmentPath();
TimeRecorderAuto recorder("SegmentWriter::Merge");
TimeRecorder recorder("SegmentWriter::Merge");
// load raw data
// After load fields, the data has been cached in segment.
@ -259,6 +259,8 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) {
return status;
}
recorder.RecordSection("load data");
// the source segment may be used in search, we can't change its data, so copy a new segment for merging
engine::SegmentPtr duplicated_segment = std::make_shared<engine::Segment>();
src_segment->CopyOutRawData(duplicated_segment);
@ -267,6 +269,8 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) {
duplicated_segment->DeleteEntity(delete_ids);
}
recorder.RecordSection("delete entities");
// convert to DataChunk
engine::DataChunkPtr chunk = std::make_shared<engine::DataChunk>();
duplicated_segment->ShareToChunkData(chunk);
@ -280,6 +284,8 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) {
// clear cache of merged segment
segment_reader->ClearCache();
recorder.ElapseFromBegin("done");
// Note: no need to merge bloom filter, the bloom filter will be created during serialize
return Status::OK();

104
core/src/segment/Utils.cpp Normal file
View File

@ -0,0 +1,104 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "segment/Utils.h"
#include <set>
#include <unordered_set>
#include <utility>
#include <vector>
#include "utils/Log.h"
namespace milvus {
namespace segment {
bool
CalcCopyRangesWithOffset(const std::vector<int32_t>& offsets, int64_t row_count, CopyRanges& copy_ranges,
int64_t& delete_count) {
copy_ranges.clear();
if (offsets.empty() || row_count <= 0) {
return false;
}
// arrange offsets
std::set<int32_t> new_offsets;
for (auto offset : offsets) {
if (offset < 0 || offset >= row_count) {
continue;
}
new_offsets.insert(offset);
}
delete_count = new_offsets.size();
if (delete_count == 0) {
return true;
}
// if the first offset is not zero, add a range [0, first]
int32_t first = *new_offsets.begin();
if (first > 0) {
copy_ranges.push_back(std::make_pair(0, first));
}
// calculate inner range
int32_t prev = *new_offsets.begin();
for (auto offset : new_offsets) {
if (offset - prev == 1) {
prev = offset;
continue;
} else {
if (prev != offset) {
copy_ranges.push_back(std::make_pair(prev + 1, offset));
}
}
prev = offset;
}
// if the last offset is not the last row, add a range [last + 1, row_count]
int32_t last = *new_offsets.rbegin();
if (last < row_count - 1) {
copy_ranges.push_back(std::make_pair(last + 1, row_count));
}
return true;
}
bool
CopyDataWithRanges(const std::vector<uint8_t>& src_data, int64_t row_width, const CopyRanges& copy_ranges,
std::vector<uint8_t>& target_data) {
target_data.clear();
if (src_data.empty() || copy_ranges.empty() || row_width <= 0) {
return false;
}
// calculate result bytes
int64_t bytes = 0;
for (auto& pair : copy_ranges) {
if (pair.second <= pair.first) {
continue;
}
bytes += (pair.second - pair.first) * row_width;
}
target_data.resize(bytes);
// copy data to result
size_t poz = 0;
for (auto& pair : copy_ranges) {
size_t len = (pair.second - pair.first) * row_width;
memcpy(target_data.data() + poz, src_data.data() + pair.first * row_width, len);
poz += len;
}
return true;
}
} // namespace segment
} // namespace milvus

53
core/src/segment/Utils.h Normal file
View File

@ -0,0 +1,53 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <ctime>
#include <string>
#include <utility>
#include <vector>
namespace milvus {
namespace segment {
using CopyRanges = std::vector<std::pair<int32_t, int32_t>>;
// calculate copy range according to deleted offsets
// for example:
// segment row count is 100, the deleted offsets is: {1,2,3, 6, 9,10}
// the copy ranges will be:
// {
// {0, 1}
// {4, 6}
// {7, 9}
// {11, 100}
// }
bool
CalcCopyRangesWithOffset(const std::vector<int32_t>& offsets, int64_t row_count, CopyRanges& copy_ranges,
int64_t& delete_count);
// copy data from source data according to copy ranges
// for example:
// each row_with is 8 bytes
// src_data has 100 rows, means 800 bytes
// the copy ranges is:
// {
// {0, 10}
// {50, 90}
// }
// then the target_data will have (10 - 0) * 8 + (90 - 50) * 8 = 400 bytes copied from src_data
bool
CopyDataWithRanges(const std::vector<uint8_t>& src_data, int64_t row_width, const CopyRanges& copy_ranges,
std::vector<uint8_t>& target_data);
} // namespace segment
} // namespace milvus

View File

@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "server/ValidationUtil.h"
#include "config/ServerConfig.h"
#include "db/Constants.h"
#include "db/Utils.h"
#include "knowhere/index/vector_index/ConfAdapter.h"
@ -343,9 +344,11 @@ ValidateIndexParams(const milvus::json& index_params, int64_t dimension, const s
Status
ValidateSegmentRowCount(int64_t segment_row_count) {
if (segment_row_count <= 0 || segment_row_count > engine::MAX_SEGMENT_ROW_COUNT) {
int64_t min = config.engine.build_index_threshold.value;
int max = engine::MAX_SEGMENT_ROW_COUNT;
if (segment_row_count < min || segment_row_count > max) {
std::string msg = "Invalid segment row count: " + std::to_string(segment_row_count) + ". " +
"Should be in range 1 ~ " + std::to_string(engine::MAX_SEGMENT_ROW_COUNT) + ".";
"Should be in range " + std::to_string(min) + " ~ " + std::to_string(max) + ".";
LOG_SERVER_ERROR_ << msg;
return Status(SERVER_INVALID_SEGMENT_ROW_COUNT, msg);
}

View File

@ -13,6 +13,7 @@
#include <fiu/fiu-local.h>
#include <gtest/gtest.h>
#include <random>
#include <string>
#include <experimental/filesystem>
@ -27,6 +28,7 @@
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "segment/IdBloomFilter.h"
#include "segment/Utils.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "utils/Json.h"
@ -46,20 +48,29 @@ CreateCollection(std::shared_ptr<DB> db, const std::string& collection_name, con
int64_t collection_id = 0;
int64_t field_id = 0;
/* field uid */
auto uid_field = std::make_shared<Field>(milvus::engine::FIELD_UID, 0,
milvus::engine::DataType::INT64, milvus::engine::snapshot::JEmpty, field_id);
auto uid_field_element_blt = std::make_shared<FieldElement>(collection_id, field_id,
milvus::engine::ELEMENT_BLOOM_FILTER, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
auto uid_field_element_del = std::make_shared<FieldElement>(collection_id, field_id,
milvus::engine::ELEMENT_DELETED_DOCS, milvus::engine::FieldElementType::FET_DELETED_DOCS);
auto uid_field = std::make_shared<Field>(milvus::engine::FIELD_UID,
0,
milvus::engine::DataType::INT64,
milvus::engine::snapshot::JEmpty,
field_id);
auto uid_field_element_blt = std::make_shared<FieldElement>(collection_id,
field_id,
milvus::engine::ELEMENT_BLOOM_FILTER,
milvus::engine::FieldElementType::FET_BLOOM_FILTER);
auto uid_field_element_del = std::make_shared<FieldElement>(collection_id,
field_id,
milvus::engine::ELEMENT_DELETED_DOCS,
milvus::engine::FieldElementType::FET_DELETED_DOCS);
field_id++;
/* field vector */
milvus::json vector_param = {{milvus::knowhere::meta::DIM, 4}};
auto vector_field = std::make_shared<Field>("vector", 0, milvus::engine::DataType::VECTOR_FLOAT, vector_param,
field_id);
auto vector_field_element_index = std::make_shared<FieldElement>(collection_id, field_id,
milvus::knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, milvus::engine::FieldElementType::FET_INDEX);
field_id);
auto vector_field_element_index = std::make_shared<FieldElement>(collection_id,
field_id,
milvus::knowhere::IndexEnum::INDEX_FAISS_IVFSQ8,
milvus::engine::FieldElementType::FET_INDEX);
context.fields_schema[uid_field] = {uid_field_element_blt, uid_field_element_del};
context.fields_schema[vector_field] = {vector_field_element_index};
@ -189,7 +200,7 @@ TEST(BloomFilterTest, ReadWriteTest) {
}
double error_rate = filter.ErrorRate();
double wrong_rate = (double)wrong_check/id_count;
double wrong_rate = (double)wrong_check / id_count;
ASSERT_LT(wrong_rate, error_rate);
};
@ -203,7 +214,7 @@ TEST(BloomFilterTest, ReadWriteTest) {
}
double error_rate = filter.ErrorRate();
double wrong_rate = (double)wrong_check/id_count;
double wrong_rate = (double)wrong_check / id_count;
ASSERT_LT(wrong_rate, error_rate);
};
@ -310,7 +321,7 @@ TEST(BloomFilterTest, CloneTest) {
}
double error_rate = filter->ErrorRate();
double wrong_rate = (double)wrong_check/id_count;
double wrong_rate = (double)wrong_check / id_count;
ASSERT_LT(wrong_rate, error_rate);
};
@ -322,3 +333,132 @@ TEST(BloomFilterTest, CloneTest) {
error_rate_check(clone_filter, removed_id_array);
}
TEST(SegmentUtilTest, CalcCopyRangeTest) {
// invalid input test
std::vector<int32_t> offsets;
int64_t row_count = 0, delete_count = 0;
milvus::segment::CopyRanges copy_ranges;
bool res = milvus::segment::CalcCopyRangesWithOffset(offsets, row_count, copy_ranges, delete_count);
ASSERT_FALSE(res);
row_count = 100;
auto compare_result =
[&](const std::vector<int32_t>& offsets, const milvus::segment::CopyRanges& compare_range) -> void {
milvus::segment::CopyRanges copy_ranges;
res = milvus::segment::CalcCopyRangesWithOffset(offsets, row_count, copy_ranges, delete_count);
ASSERT_TRUE(res);
int64_t compare_count = 0;
for (auto offset : offsets) {
if (offset >= 0 && offset < row_count) {
compare_count++;
}
}
ASSERT_EQ(delete_count, compare_count);
ASSERT_EQ(copy_ranges.size(), compare_range.size());
for (size_t i = 0; i < copy_ranges.size(); ++i) {
ASSERT_EQ(copy_ranges[i], compare_range[i]);
}
};
{
offsets = {0, 1, 2, 99, 100};
milvus::segment::CopyRanges compare = {
{3, 99}
};
compare_result(offsets, compare);
}
{
offsets = {-1, 5, 4, 3, 90, 91};
milvus::segment::CopyRanges compare = {
{0, 3},
{6, 90},
{92, 100},
};
compare_result(offsets, compare);
}
}
TEST(SegmentUtilTest, CopyRangeDataTest) {
auto compare_result = [&](std::vector<uint8_t>& src_data,
std::vector<int32_t>& offsets,
int64_t row_count,
int64_t row_width) -> void {
int64_t delete_count = 0;
milvus::segment::CopyRanges copy_ranges;
auto res = milvus::segment::CalcCopyRangesWithOffset(offsets, row_count, copy_ranges, delete_count);
ASSERT_TRUE(res);
if (copy_ranges.empty()) {
return;
}
std::vector<uint8_t> target_data;
res = milvus::segment::CopyDataWithRanges(src_data, row_width, copy_ranges, target_data);
ASSERT_TRUE(res);
// erase element from the largest offset
std::vector<uint8_t> compare_data = src_data;
std::set<int32_t> arrange_offsets;
for (auto offset : offsets) {
if (offset >= 0 && offset < row_count) {
arrange_offsets.insert(offset);
}
}
for (auto iter = arrange_offsets.rbegin(); iter != arrange_offsets.rend(); ++iter) {
auto step = (*iter) * row_width;
compare_data.erase(compare_data.begin() + step, compare_data.begin() + step + row_width);
}
ASSERT_EQ(target_data, compare_data);
};
// invalid input test
std::vector<int32_t> offsets;
std::vector<uint8_t> src_data;
int64_t row_width = 0;
milvus::segment::CopyRanges copy_ranges;
std::vector<uint8_t> target_data;
bool res = milvus::segment::CopyDataWithRanges(src_data, row_width, copy_ranges, target_data);
ASSERT_FALSE(res);
// construct source data
row_width = 64;
int64_t row_count = 100;
src_data.resize(row_count * row_width);
for (int64_t i = 0; i < row_count * row_width; ++i) {
src_data[i] = i % 255;
}
{
offsets = {0, 1, 2, 99, 100};
compare_result(src_data, offsets, row_count, row_width);
}
{
offsets = {-1, 5, 4, 3, 90, 91};
compare_result(src_data, offsets, row_count, row_width);
}
// random test
for (int32_t i = 0; i < 10; ++i) {
std::default_random_engine random;
row_count = random() % 100 + 1;
row_width = random() % 8 + 8;
src_data.resize(row_count * row_width);
for (int64_t i = 0; i < row_count * row_width; ++i) {
src_data[i] = i % 255;
}
int64_t offset_count = (row_count > 1) ? (random() % row_count + 1) : 1;
offsets.resize(offset_count);
for (int64_t k = 0; k < offset_count; ++k) {
offsets[k] = (random() % row_count) + ((k % 2 == 0) ? 2 : -2);
}
compare_result(src_data, offsets, row_count, row_width);
}
}

View File

@ -468,10 +468,9 @@ def add_vector_field(nb, dimension=dimension):
def gen_segment_row_counts():
sizes = [
1,
2,
1024,
4096
4096,
8192,
1000000,
]
return sizes