#3722 rename segment_row_count to segment_row_limit (#3724)

* #3722 rename segment_row_count to segment_row_limit

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix unitest

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix unittest

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix unittest 3

Signed-off-by: groot <yihua.mo@zilliz.com>

* wal issue

Signed-off-by: groot <yihua.mo@zilliz.com>

* bloom magic number

Signed-off-by: groot <yihua.mo@zilliz.com>

* typo

Signed-off-by: groot <yihua.mo@zilliz.com>

* add log

Signed-off-by: groot <yihua.mo@zilliz.com>
This commit is contained in:
groot 2020-09-15 14:33:11 +08:00 committed by GitHub
parent b6010dfd1f
commit 6ce00fb043
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 99 additions and 29 deletions

View File

@ -27,7 +27,7 @@ const char* PARAM_DIMENSION = knowhere::meta::DIM;
const char* PARAM_INDEX_TYPE = "index_type";
const char* PARAM_INDEX_METRIC_TYPE = knowhere::Metric::TYPE;
const char* PARAM_INDEX_EXTRA_PARAMS = "params";
const char* PARAM_SEGMENT_ROW_COUNT = "segment_row_count";
const char* PARAM_SEGMENT_ROW_COUNT = "segment_row_limit";
const char* DEFAULT_STRUCTURED_INDEX = "SORTED"; // this string should be defined in knowhere::IndexEnum
const char* DEFAULT_PARTITON_TAG = "_default";

View File

@ -12,8 +12,10 @@
#include "db/wal/WalFile.h"
#include "db/Constants.h"
#include "db/Types.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include <experimental/filesystem>
#include <limits>
namespace milvus {
@ -42,6 +44,12 @@ WalFile::OpenFile(const std::string& path, OpenMode mode) {
default:
return Status(DB_ERROR, "Unsupported file mode");
}
// makesure the parent path is created
std::experimental::filesystem::path temp_path(path);
auto parent_path = temp_path.parent_path();
CommonUtil::CreateDirectory(parent_path.c_str());
file_ = fopen(path.c_str(), str_mode.c_str());
if (file_ == nullptr) {
std::string msg = "Failed to create wal file: " + path;

View File

@ -181,6 +181,10 @@ Status
WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) {
WaitCleanupFinish();
if (db == nullptr) {
return Status(DB_ERROR, "null pointer");
}
LOG_ENGINE_DEBUG_ << "Begin wal recovery";
try {
@ -238,6 +242,9 @@ WalManager::Recovery(const DBPtr& db, const CollectionMaxOpIDMap& max_op_ids) {
}
}
}
// flush to makesure data is serialized
return db->Flush();
} catch (std::exception& ex) {
std::string msg = "Failed to recovery wal, reason: " + std::string(ex.what());
return Status(DB_ERROR, msg);
@ -375,7 +382,6 @@ WalManager::ConstructFilePath(const std::string& collection_name, const std::str
// typically, the wal file path is like: /xxx/milvus/wal/[collection_name]/xxxxxxxxxx
std::experimental::filesystem::path full_path(wal_path_);
full_path.append(collection_name);
std::experimental::filesystem::create_directory(full_path);
full_path.append(file_name);
std::string path(full_path.c_str());
@ -464,10 +470,11 @@ WalManager::CleanupThread() {
{
std::lock_guard<std::mutex> lock(file_map_mutex_);
file_map_.erase(target_collection);
}
// remove collection folder
std::experimental::filesystem::remove_all(collection_path);
// remove collection folder
// do this under the lock to avoid multi-thread conflict
std::experimental::filesystem::remove_all(collection_path);
}
TakeCleanupTask(target_collection);
continue;

View File

@ -29,6 +29,9 @@ namespace segment {
constexpr double BLOOM_FILTER_ERROR_RATE = 0.01;
constexpr int64_t CAPACITY_EXPAND = 1024;
// the magic num is converted from string "bloom_0"
constexpr int64_t BLOOM_FILE_MAGIC_NUM = 0x305F6D6F6F6C62;
IdBloomFilter::IdBloomFilter(int64_t capacity) : capacity_(capacity + CAPACITY_EXPAND) {
}
@ -117,6 +120,7 @@ IdBloomFilter::Write(const storage::FSHandlerPtr& fs_ptr) {
scaling_bloom_t* bloom_filter = GetBloomFilter();
try {
fs_ptr->writer_ptr_->Write(&(BLOOM_FILE_MAGIC_NUM), sizeof(BLOOM_FILE_MAGIC_NUM));
fs_ptr->writer_ptr_->Write(&(bloom_filter->capacity), sizeof(bloom_filter->capacity));
fs_ptr->writer_ptr_->Write(&(bloom_filter->error_rate), sizeof(bloom_filter->error_rate));
fs_ptr->writer_ptr_->Write(&(bloom_filter->bitmap->bytes), sizeof(bloom_filter->bitmap->bytes));
@ -137,6 +141,13 @@ IdBloomFilter::Read(const storage::FSHandlerPtr& fs_ptr) {
FreeBloomFilter();
try {
int64_t magic_num = 0;
fs_ptr->reader_ptr_->Read(&magic_num, sizeof(magic_num));
if (magic_num != BLOOM_FILE_MAGIC_NUM) {
LOG_ENGINE_ERROR_ << "legacy bloom filter file, could not read bloom filter data";
return Status(DB_ERROR, "");
}
unsigned int capacity = 0;
fs_ptr->reader_ptr_->Read(&capacity, sizeof(capacity));
capacity_ = capacity;

View File

@ -33,6 +33,7 @@
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace segment {
@ -105,6 +106,8 @@ SegmentReader::Load() {
Status
SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& raw, bool to_cache) {
try {
TimeRecorder recorder("SegmentReader::LoadField: " + field_name);
segment_ptr_->GetFixedFieldData(field_name, raw);
if (raw != nullptr) {
return Status::OK(); // already exist
@ -133,6 +136,8 @@ SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& r
}
segment_ptr_->SetFixedFieldData(field_name, raw);
recorder.RecordSection("read " + file_path);
} catch (std::exception& e) {
std::string err_msg = "Failed to load raw vectors: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
@ -162,6 +167,8 @@ Status
SegmentReader::LoadEntities(const std::string& field_name, const std::vector<int64_t>& offsets,
engine::BinaryDataPtr& raw) {
try {
TimeRecorderAuto recorder("SegmentReader::LoadEntities: " + field_name);
auto field_visitor = segment_visitor_->GetFieldVisitor(field_name);
if (field_visitor == nullptr) {
return Status(DB_ERROR, "Invalid field_name");
@ -247,6 +254,8 @@ SegmentReader::LoadUids(std::vector<engine::idx_t>& uids) {
Status
SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndexPtr& index_ptr, bool flat) {
try {
TimeRecorder recorder("SegmentReader::LoadVectorIndex: " + field_name);
segment_ptr_->GetVectorIndex(field_name, index_ptr);
if (index_ptr != nullptr) {
return Status::OK(); // already exist
@ -274,6 +283,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
concurrent_bitset_ptr->set(offset);
}
}
recorder.RecordSection("prepare");
knowhere::BinarySet index_data;
knowhere::BinaryPtr raw_data, compress_data;
@ -318,6 +328,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
cache::CpuCacheMgr::GetInstance().InsertItem(temp_index_path, index_ptr);
}
recorder.RecordSection("create temp IDMAP index");
return Status::OK();
}
@ -334,6 +345,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
}
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ReadIndex(fs_ptr_, index_file_path, index_data));
recorder.RecordSection("read index file: " + index_file_path);
// for some kinds index(IVF), read raw file
auto index_type = index_visitor->GetElement()->GetTypeName();
@ -346,6 +358,8 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
auto file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ReadRaw(fs_ptr_, file_path, raw_data));
recorder.RecordSection("read raw file: " + file_path);
}
}
@ -355,6 +369,8 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
auto file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
STATUS_CHECK(ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data));
recorder.RecordSection("read compress file: " + file_path);
}
}
@ -378,6 +394,8 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
Status
SegmentReader::LoadStructuredIndex(const std::string& field_name, knowhere::IndexPtr& index_ptr) {
try {
TimeRecorderAuto recorder("SegmentReader::LoadStructuredIndex");
segment_ptr_->GetStructuredIndex(field_name, index_ptr);
if (index_ptr != nullptr) {
return Status::OK(); // already exist
@ -449,6 +467,8 @@ SegmentReader::LoadVectorIndice() {
Status
SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
try {
TimeRecorderAuto recorder("SegmentReader::LoadBloomFilter");
id_bloom_filter_ptr = segment_ptr_->GetBloomFilter();
if (id_bloom_filter_ptr != nullptr) {
return Status::OK(); // already exist
@ -486,6 +506,8 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
Status
SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
try {
TimeRecorder recorder("SegmentReader::LoadDeletedDocs");
deleted_docs_ptr = segment_ptr_->GetDeletedDocs();
if (deleted_docs_ptr != nullptr) {
return Status::OK(); // already exist
@ -591,6 +613,8 @@ SegmentReader::GetTempIndexPath(const std::string& field_name, std::string& path
Status
SegmentReader::ClearCache() {
TimeRecorderAuto recorder("SegmentReader::ClearCache");
if (segment_visitor_ == nullptr) {
return Status::OK();
}
@ -641,6 +665,8 @@ SegmentReader::ClearCache() {
Status
SegmentReader::ClearIndexCache(const std::string& field_name) {
TimeRecorderAuto recorder("SegmentReader::ClearIndexCache");
if (segment_visitor_ == nullptr) {
return Status::OK();
}

View File

@ -91,6 +91,8 @@ SegmentWriter::Serialize() {
Status
SegmentWriter::WriteField(const std::string& file_path, const engine::BinaryDataPtr& raw) {
TimeRecorderAuto recorder("SegmentWriter::WriteField: " + file_path);
auto& ss_codec = codec::Codec::instance();
STATUS_CHECK(ss_codec.GetBlockFormat()->Write(fs_ptr_, file_path, raw));
@ -118,7 +120,7 @@ SegmentWriter::WriteFields() {
auto file_size = milvus::CommonUtil::GetFileSize(file_path);
segment_file->SetSize(file_size);
recorder.RecordSection("Serialize field raw file");
LOG_ENGINE_DEBUG_ << "Serialize raw file size: " << file_size;
} else {
return Status(DB_ERROR, "Raw element missed in snapshot");
}
@ -159,6 +161,8 @@ SegmentWriter::WriteBloomFilter() {
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::IdBloomFilterFormat::FilePostfix());
segment_file->SetSize(file_size);
LOG_ENGINE_DEBUG_ << "Serialize bloom filter file size: " << file_size;
} else {
return Status(DB_ERROR, "Bloom filter element missed in snapshot");
}
@ -182,6 +186,8 @@ SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilte
Status
SegmentWriter::WriteDeletedDocs() {
TimeRecorder recorder("SegmentWriter::WriteDeletedDocs");
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::FIELD_UID);
auto del_doc_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
@ -194,6 +200,8 @@ SegmentWriter::WriteDeletedDocs() {
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::DeletedDocsFormat::FilePostfix());
segment_file->SetSize(file_size);
LOG_ENGINE_DEBUG_ << "Serialize deleted docs file size: " << file_size;
} else {
return Status(DB_ERROR, "Deleted-doc element missed in snapshot");
}
@ -232,7 +240,7 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) {
return status;
}
if (src_id == target_id) {
return Status(DB_ERROR, "Cannot Merge Self");
return Status(DB_ERROR, "Cannot merge Self");
}
LOG_ENGINE_DEBUG_ << "Merging from " << segment_reader->GetSegmentPath() << " to " << GetSegmentPath();
@ -330,6 +338,8 @@ SegmentWriter::SetVectorIndex(const std::string& field_name, const milvus::knowh
Status
SegmentWriter::WriteVectorIndex(const std::string& field_name) {
try {
TimeRecorder recorder("SegmentWriter::WriteVectorIndex");
knowhere::VecIndexPtr index;
auto status = segment_ptr_->GetVectorIndex(field_name, index);
if (!status.ok() || index == nullptr) {
@ -355,6 +365,8 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) {
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::VectorIndexFormat::FilePostfix());
segment_file->SetSize(file_size);
recorder.RecordSection("Serialize index file size: " + std::to_string(file_size));
}
}
@ -370,6 +382,8 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) {
auto file_size =
milvus::CommonUtil::GetFileSize(file_path + codec::VectorCompressFormat::FilePostfix());
segment_file->SetSize(file_size);
recorder.RecordSection("Serialize index compress file size: " + std::to_string(file_size));
}
}
} catch (std::exception& e) {
@ -391,6 +405,8 @@ SegmentWriter::SetStructuredIndex(const std::string& field_name, const knowhere:
Status
SegmentWriter::WriteStructuredIndex(const std::string& field_name) {
try {
TimeRecorder recorder("SegmentWriter::WriteStructuredIndex");
knowhere::IndexPtr index;
auto status = segment_ptr_->GetStructuredIndex(field_name, index);
if (!status.ok() || index == nullptr) {
@ -417,6 +433,8 @@ SegmentWriter::WriteStructuredIndex(const std::string& field_name) {
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::StructuredIndexFormat::FilePostfix());
segment_file->SetSize(file_size);
recorder.RecordSection("Serialize structured index file size: " + std::to_string(file_size));
}
} catch (std::exception& e) {
std::string err_msg = "Failed to write vector index: " + std::string(e.what());

View File

@ -325,6 +325,7 @@ EventTest::TearDown() {
DBOptions
WalTest::GetOptions() {
DBOptions options;
options.meta_.path_ = "/tmp/milvus_ss/db";
options.meta_.backend_uri_ = "mock://:@:/";
options.wal_path_ = "/tmp/milvus_wal";
options.wal_enable_ = true;

View File

@ -57,7 +57,7 @@ class TestCreateCollection:
collection_name = gen_unique_str(collection_id)
fields = {
"fields": [filter_field, vector_field],
"segment_row_count": segment_row_count
"segment_row_limit": segment_row_count
}
logging.getLogger().info(fields)
connect.create_collection(collection_name, fields)
@ -74,7 +74,7 @@ class TestCreateCollection:
collection_name = gen_unique_str(collection_id)
fields = {
"fields": [filter_field, vector_field],
"segment_row_count": segment_row_count
"segment_row_limit": segment_row_count
}
connect.create_collection(collection_name, fields)
assert connect.has_collection(collection_name)
@ -87,7 +87,7 @@ class TestCreateCollection:
'''
collection_name = gen_unique_str(collection_id)
fields = copy.deepcopy(default_fields)
fields["segment_row_count"] = get_segment_row_count
fields["segment_row_limit"] = get_segment_row_count
connect.create_collection(collection_name, fields)
assert connect.has_collection(collection_name)
@ -232,7 +232,7 @@ class TestCreateCollectionInvalid(object):
def test_create_collection_with_invalid_segment_row_count(self, connect, get_segment_row_count):
collection_name = gen_unique_str()
fields = copy.deepcopy(default_fields)
fields["segment_row_count"] = get_segment_row_count
fields["segment_row_limit"] = get_segment_row_count
with pytest.raises(Exception) as e:
connect.create_collection(collection_name, fields)
@ -292,11 +292,11 @@ class TestCreateCollectionInvalid(object):
'''
collection_name = gen_unique_str(collection_id)
fields = copy.deepcopy(default_fields)
fields.pop("segment_row_count")
fields.pop("segment_row_limit")
connect.create_collection(collection_name, fields)
res = connect.get_collection_info(collection_name)
logging.getLogger().info(res)
assert res["segment_row_count"] == default_segment_row_count
assert res["segment_row_limit"] == default_segment_row_count
# TODO: assert exception
def test_create_collection_limit_fields(self, connect):

View File

@ -64,12 +64,12 @@ class TestInfoBase:
collection_name = gen_unique_str(collection_id)
fields = {
"fields": [filter_field, vector_field],
"segment_row_count": segment_row_count
"segment_row_limit": segment_row_count
}
connect.create_collection(collection_name, fields)
res = connect.get_collection_info(collection_name)
assert res['auto_id'] == True
assert res['segment_row_count'] == segment_row_count
assert res['segment_row_limit'] == segment_row_count
assert len(res["fields"]) == 2
for field in res["fields"]:
if field["type"] == filter_field:
@ -86,11 +86,11 @@ class TestInfoBase:
'''
collection_name = gen_unique_str(collection_id)
fields = copy.deepcopy(default_fields)
fields["segment_row_count"] = get_segment_row_count
fields["segment_row_limit"] = get_segment_row_count
connect.create_collection(collection_name, fields)
# assert segment row count
res = connect.get_collection_info(collection_name)
assert res['segment_row_count'] == get_segment_row_count
assert res['segment_row_limit'] == get_segment_row_count
def test_get_collection_info_after_index_created(self, connect, collection, get_simple_index):
connect.create_index(collection, field_name, get_simple_index)
@ -163,7 +163,7 @@ class TestInfoBase:
collection_name = gen_unique_str(collection_id)
fields = {
"fields": [filter_field, vector_field],
"segment_row_count": segment_row_count
"segment_row_limit": segment_row_count
}
connect.create_collection(collection_name, fields)
entities = gen_entities_by_fields(fields["fields"], nb, vector_field["params"]["dim"])
@ -171,7 +171,7 @@ class TestInfoBase:
connect.flush([collection_name])
res = connect.get_collection_info(collection_name)
assert res['auto_id'] == True
assert res['segment_row_count'] == segment_row_count
assert res['segment_row_limit'] == segment_row_count
assert len(res["fields"]) == 2
for field in res["fields"]:
if field["type"] == filter_field:
@ -188,14 +188,14 @@ class TestInfoBase:
'''
collection_name = gen_unique_str(collection_id)
fields = copy.deepcopy(default_fields)
fields["segment_row_count"] = get_segment_row_count
fields["segment_row_limit"] = get_segment_row_count
connect.create_collection(collection_name, fields)
entities = gen_entities_by_fields(fields["fields"], nb, fields["fields"][-1]["params"]["dim"])
res_ids = connect.insert(collection_name, entities)
connect.flush([collection_name])
res = connect.get_collection_info(collection_name)
assert res['auto_id'] == True
assert res['segment_row_count'] == get_segment_row_count
assert res['segment_row_limit'] == get_segment_row_count
class TestInfoInvalid(object):

View File

@ -203,7 +203,7 @@ class TestInsertBase:
collection_name = gen_unique_str("test_collection")
fields = {
"fields": [filter_field, vector_field],
"segment_row_count": segment_row_count,
"segment_row_limit": segment_row_count,
"auto_id": True
}
connect.create_collection(collection_name, fields)
@ -278,7 +278,7 @@ class TestInsertBase:
collection_name = gen_unique_str("test_collection")
fields = {
"fields": [filter_field, vector_field],
"segment_row_count": segment_row_count
"segment_row_limit": segment_row_count
}
connect.create_collection(collection_name, fields)
entities = gen_entities_by_fields(fields["fields"], nb, dim)

View File

@ -224,11 +224,10 @@ class TestListIdInSegmentBase:
get_simple_index["metric_type"] = "IP"
ids, seg_id = get_segment_id(connect, collection, nb=nb, index_params=get_simple_index)
vector_ids = connect.list_id_in_segment(collection, seg_id)
# TODO:
segment_row_count = connect.get_collection_info(collection)["segment_row_count"]
# TODO:
segment_row_count = connect.get_collection_info(collection)["segment_row_limit"]
assert vector_ids[0:segment_row_count] == ids[0:segment_row_count]
class TestListIdInSegmentBinary:
"""
******************************************************************

View File

@ -150,7 +150,7 @@ class TestFlushBase:
collection_new = gen_unique_str("test_flush")
fields = {
"fields": [filter_field, vector_field],
"segment_row_count": segment_row_count,
"segment_row_limit": segment_row_count,
"auto_id": False
}
connect.create_collection(collection_new, fields)

View File

@ -228,7 +228,7 @@ def gen_default_fields(auto_id=True):
{"field": "float", "type": DataType.FLOAT},
{"field": default_float_vec_field_name, "type": DataType.FLOAT_VECTOR, "params": {"dim": dim}},
],
"segment_row_count": segment_row_count,
"segment_row_limit": segment_row_count,
"auto_id" : auto_id
}
return default_fields
@ -241,7 +241,7 @@ def gen_binary_default_fields(auto_id=True):
{"field": "float", "type": DataType.FLOAT},
{"field": default_binary_vec_field_name, "type": DataType.BINARY_VECTOR, "params": {"dim": dim}}
],
"segment_row_count": segment_row_count,
"segment_row_limit": segment_row_count,
"auto_id" : auto_id
}
return default_fields