mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
parent
c6d0bfb3e5
commit
5ff0070bf3
@ -76,26 +76,10 @@ storage:
|
||||
# | files in advance before implementing data changes. WAL | | |
|
||||
# | ensures the atomicity and durability for Milvus operations.| | |
|
||||
#----------------------+------------------------------------------------------------+------------+-----------------+
|
||||
# recovery_error_ignore| Whether to ignore logs with errors that happens during WAL | Boolean | false |
|
||||
# | recovery. If true, when Milvus restarts for recovery and | | |
|
||||
# | there are errors in WAL log files, log files with errors | | |
|
||||
# | are ignored. If false, Milvus does not restart when there | | |
|
||||
# | are errors in WAL log files. | | |
|
||||
#----------------------+------------------------------------------------------------+------------+-----------------+
|
||||
# buffer_size | Sum total of the read buffer and the write buffer in Bytes.| String | 256MB |
|
||||
# | buffer_size must be in range [64MB, 4096MB]. | | |
|
||||
# | If the value you specified is out of range, Milvus | | |
|
||||
# | automatically uses the boundary value closest to the | | |
|
||||
# | specified value. It is recommended you set buffer_size to | | |
|
||||
# | a value greater than the inserted data size of a single | | |
|
||||
# | insert operation for better performance. | | |
|
||||
#----------------------+------------------------------------------------------------+------------+-----------------+
|
||||
# path | Location of WAL log files. | String | |
|
||||
#----------------------+------------------------------------------------------------+------------+-----------------+
|
||||
wal:
|
||||
enable: true
|
||||
recovery_error_ignore: false
|
||||
buffer_size: 256MB
|
||||
path: @MILVUS_DB_PATH@/wal
|
||||
|
||||
#----------------------+------------------------------------------------------------+------------+-----------------+
|
||||
|
||||
@ -160,8 +160,8 @@ VectorIndexFormat::ConstructIndex(const std::string& index_name, knowhere::Binar
|
||||
}
|
||||
|
||||
if (compress_data != nullptr) {
|
||||
LOG_ENGINE_DEBUG_ << "load index with " << SQ8_DATA << " " << compress_data->size;
|
||||
index_data.Append(SQ8_DATA, compress_data);
|
||||
LOG_ENGINE_DEBUG_ << "load index with " << QUANTIZATION_DATA << " " << compress_data->size;
|
||||
index_data.Append(QUANTIZATION_DATA, compress_data);
|
||||
length += compress_data->size;
|
||||
}
|
||||
|
||||
@ -217,7 +217,7 @@ VectorIndexFormat::WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std:
|
||||
|
||||
auto binaryset = index->Serialize(knowhere::Config());
|
||||
|
||||
auto sq8_data = binaryset.Erase(SQ8_DATA);
|
||||
auto sq8_data = binaryset.Erase(QUANTIZATION_DATA);
|
||||
if (sq8_data != nullptr) {
|
||||
auto& ss_codec = codec::Codec::instance();
|
||||
ss_codec.GetVectorCompressFormat()->Write(fs_ptr, file_path, sq8_data);
|
||||
|
||||
@ -21,16 +21,16 @@ constexpr int64_t MB = 1LL << 20;
|
||||
constexpr int64_t GB = 1LL << 30;
|
||||
constexpr int64_t TB = 1LL << 40;
|
||||
|
||||
constexpr int64_t MAX_MEM_SEGMENT_SIZE = 128 * MB;
|
||||
constexpr int64_t MAX_MEM_SEGMENT_SIZE = 128 * MB; // max data size of one segment in insert buffer
|
||||
|
||||
constexpr int64_t MAX_NAME_LENGTH = 255;
|
||||
constexpr int64_t MAX_DIMENSION = 32768;
|
||||
constexpr int32_t MAX_SEGMENT_ROW_COUNT = 4 * 1024 * 1024;
|
||||
constexpr int64_t DEFAULT_SEGMENT_ROW_COUNT = 100000; // default row count per segment when creating collection
|
||||
constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * MB;
|
||||
constexpr int64_t MAX_WAL_FILE_SIZE = 256 * MB;
|
||||
constexpr int64_t MAX_NAME_LENGTH = 255; // max string length for collection/partition/field name
|
||||
constexpr int64_t MAX_DIMENSION = 32768; // max dimension of vector field
|
||||
constexpr int32_t MAX_SEGMENT_ROW_COUNT = 4 * 1024 * 1024; // max row count of one segment
|
||||
constexpr int64_t DEFAULT_SEGMENT_ROW_COUNT = 512 * 1024; // default row count per segment when creating collection
|
||||
constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * MB; // max data size in one insert action
|
||||
constexpr int64_t MAX_WAL_FILE_SIZE = 256 * MB; // max file size of wal file
|
||||
|
||||
constexpr int64_t BUILD_INEDX_RETRY_TIMES = 3;
|
||||
constexpr int64_t BUILD_INEDX_RETRY_TIMES = 3; // retry times if build index failed
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
||||
@ -61,10 +61,10 @@ SetSnapshotIndex(const std::string& collection_name, const std::string& field_na
|
||||
json[engine::PARAM_INDEX_EXTRA_PARAMS] = index_info.extra_params_;
|
||||
index_element->SetParams(json);
|
||||
|
||||
if (index_info.index_name_ == knowhere::IndexEnum::INDEX_RHNSWSQ) {
|
||||
if (utils::RequireCompressFile(index_info.index_type_)) {
|
||||
auto compress_element =
|
||||
std::make_shared<snapshot::FieldElement>(ss->GetCollectionId(), field->GetID(), ELEMENT_INDEX_COMPRESS,
|
||||
milvus::engine::FieldElementType::FET_COMPRESS_SQ8);
|
||||
milvus::engine::FieldElementType::FET_COMPRESS);
|
||||
ss_context.new_field_elements.push_back(compress_element);
|
||||
}
|
||||
}
|
||||
@ -135,7 +135,7 @@ DeleteSnapshotIndex(const std::string& collection_name, const std::string& field
|
||||
std::vector<snapshot::FieldElementPtr> elements = ss->GetFieldElementsByField(name);
|
||||
for (auto& element : elements) {
|
||||
if (element->GetFEtype() == engine::FieldElementType::FET_INDEX ||
|
||||
element->GetFEtype() == engine::FieldElementType::FET_COMPRESS_SQ8) {
|
||||
element->GetFEtype() == engine::FieldElementType::FET_COMPRESS) {
|
||||
snapshot::OperationContext context;
|
||||
context.stale_field_elements.push_back(element);
|
||||
auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
|
||||
|
||||
@ -85,7 +85,7 @@ enum class FieldElementType {
|
||||
FET_BLOOM_FILTER = 2,
|
||||
FET_DELETED_DOCS = 3,
|
||||
FET_INDEX = 4,
|
||||
FET_COMPRESS_SQ8 = 5,
|
||||
FET_COMPRESS = 5,
|
||||
};
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@ -169,6 +169,17 @@ GetSizeOfChunk(const engine::DataChunkPtr& chunk) {
|
||||
return total_size;
|
||||
}
|
||||
|
||||
bool
|
||||
RequireRawFile(const std::string& index_type) {
|
||||
return index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || index_type == knowhere::IndexEnum::INDEX_NSG ||
|
||||
index_type == knowhere::IndexEnum::INDEX_HNSW;
|
||||
}
|
||||
|
||||
bool
|
||||
RequireCompressFile(const std::string& index_type) {
|
||||
return index_type == knowhere::IndexEnum::INDEX_RHNSWSQ;
|
||||
}
|
||||
|
||||
} // namespace utils
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
||||
@ -58,6 +58,12 @@ GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids);
|
||||
int64_t
|
||||
GetSizeOfChunk(const engine::DataChunkPtr& chunk);
|
||||
|
||||
bool
|
||||
RequireRawFile(const std::string& index_type);
|
||||
|
||||
bool
|
||||
RequireCompressFile(const std::string& index_type);
|
||||
|
||||
} // namespace utils
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
||||
@ -728,8 +728,8 @@ ExecutionEngineImpl::CreateSnapshotIndexFile(AddSegmentFileOperation& operation,
|
||||
|
||||
// create snapshot compress file
|
||||
std::string index_name = index_element->GetName();
|
||||
if (index_name == knowhere::IndexEnum::INDEX_RHNSWSQ) {
|
||||
auto compress_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8);
|
||||
if (utils::RequireCompressFile(index_info.index_type_)) {
|
||||
auto compress_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS);
|
||||
if (compress_visitor == nullptr) {
|
||||
return Status(DB_ERROR,
|
||||
"Could not build index: compress element not exist"); // something wrong in CreateIndex
|
||||
|
||||
@ -27,7 +27,7 @@ namespace knowhere {
|
||||
|
||||
#define INDEX_DATA "INDEX_DATA"
|
||||
#define RAW_DATA "RAW_DATA"
|
||||
#define SQ8_DATA "SQ8_DATA"
|
||||
#define QUANTIZATION_DATA "QUANTIZATION_DATA"
|
||||
|
||||
class VecIndex : public Index {
|
||||
public:
|
||||
|
||||
@ -25,11 +25,11 @@
|
||||
#include "codecs/Codec.h"
|
||||
#include "db/SnapshotUtils.h"
|
||||
#include "db/Types.h"
|
||||
#include "db/Utils.h"
|
||||
#include "db/snapshot/ResourceHelper.h"
|
||||
#include "knowhere/index/vector_index/VecIndex.h"
|
||||
#include "knowhere/index/vector_index/VecIndexFactory.h"
|
||||
#include "knowhere/index/vector_index/adapter/VectorAdapter.h"
|
||||
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
|
||||
#include "storage/disk/DiskIOReader.h"
|
||||
#include "storage/disk/DiskIOWriter.h"
|
||||
#include "storage/disk/DiskOperation.h"
|
||||
@ -342,8 +342,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
|
||||
|
||||
// for some kinds index(IVF), read raw file
|
||||
auto index_type = index_visitor->GetElement()->GetTypeName();
|
||||
if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || index_type == knowhere::IndexEnum::INDEX_NSG ||
|
||||
index_type == knowhere::IndexEnum::INDEX_HNSW) {
|
||||
if (engine::utils::RequireRawFile(index_type)) {
|
||||
engine::BinaryDataPtr fixed_data;
|
||||
auto status = segment_ptr_->GetFixedFieldData(field_name, fixed_data);
|
||||
if (status.ok()) {
|
||||
@ -355,9 +354,9 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
|
||||
}
|
||||
}
|
||||
|
||||
// for some kinds index(SQ8), read compress file
|
||||
if (index_type == knowhere::IndexEnum::INDEX_RHNSWSQ) {
|
||||
if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8)) {
|
||||
// for some kinds index(RHNSWSQ), read compress file
|
||||
if (engine::utils::RequireCompressFile(index_type)) {
|
||||
if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS)) {
|
||||
auto file_path =
|
||||
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
|
||||
ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data);
|
||||
|
||||
@ -401,7 +401,7 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) {
|
||||
|
||||
// serialize compress file
|
||||
{
|
||||
auto element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8);
|
||||
auto element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_COMPRESS);
|
||||
if (element_visitor && element_visitor->GetFile()) {
|
||||
auto segment_file = element_visitor->GetFile();
|
||||
std::string file_path =
|
||||
|
||||
@ -64,7 +64,7 @@ DescribeIndexReq::OnExecute() {
|
||||
}
|
||||
}
|
||||
|
||||
json_params_[engine::PARAM_INDEX_TYPE] = index.index_name_;
|
||||
json_params_[engine::PARAM_INDEX_TYPE] = index.index_type_;
|
||||
json_params_[engine::PARAM_INDEX_METRIC_TYPE] = index.metric_name_;
|
||||
json_params_[engine::PARAM_INDEX_EXTRA_PARAMS] = index.extra_params_;
|
||||
} catch (std::exception& ex) {
|
||||
|
||||
@ -32,7 +32,7 @@ constexpr int64_t NQ = 5;
|
||||
constexpr int64_t TOP_K = 10;
|
||||
constexpr int64_t NPROBE = 32;
|
||||
constexpr int64_t SEARCH_TARGET = BATCH_ENTITY_COUNT / 2; // change this value, result is different
|
||||
constexpr int64_t ADD_ENTITY_LOOP = 1;
|
||||
constexpr int64_t ADD_ENTITY_LOOP = 10;
|
||||
constexpr milvus::IndexType INDEX_TYPE = milvus::IndexType::IVFFLAT;
|
||||
constexpr int32_t NLIST = 16384;
|
||||
const char* PARTITION_TAG = "part";
|
||||
|
||||
@ -13,7 +13,7 @@ from utils import *
|
||||
nb = 1
|
||||
dim = 128
|
||||
collection_id = "create_collection"
|
||||
default_segment_row_count = 100000
|
||||
default_segment_row_count = 512 * 1024
|
||||
drop_collection_interval_time = 3
|
||||
segment_row_count = 5000
|
||||
default_fields = gen_default_fields()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user