enhance:[cherry-pick] Use binlog index for better search performance (#29012)

this pr is cherry-pick from master:
pr: https://github.com/milvus-io/milvus/pull/28528
pr: https://github.com/milvus-io/milvus/pull/27673
related issue:
issue: https://github.com/milvus-io/milvus/issues/27678

Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
This commit is contained in:
cqy123456 2023-12-07 09:52:34 +08:00 committed by GitHub
parent cdd1305f26
commit 8fd38c8eea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 591 additions and 89 deletions

View File

@ -287,10 +287,11 @@ queryNode:
# This parameter is only useful when enable-disk = true.
# And this value should be a number greater than 1 and less than 32.
chunkRows: 1024 # The number of vectors in a chunk.
growing: # growing a vector index for growing segment to accelerate search
interimIndex: # build a vector temperate index for growing segment or binlog to accelerate search
enableIndex: true
nlist: 128 # growing segment index nlist
nprobe: 16 # nprobe to search growing segment, based on your accuracy requirement, must smaller than nlist
nlist: 128 # segment index nlist
nprobe: 16 # nprobe to search segment, based on your accuracy requirement, must smaller than nlist
memExpansionRate: 1.15 # the ratio of building interim index memory usage to raw data
loadMemoryUsageFactor: 1 # The multiply factor of calculating the memory usage while loading segments
enableDisk: false # enable querynode load disk index, and search on disk index
maxDiskUsagePercentage: 95

View File

@ -31,8 +31,10 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta,
: FieldIndexing(field_meta, segcore_config),
build(false),
sync_with_index(false),
config_(std::make_unique<VecIndexConfig>(
segment_max_row_count, field_index_meta, segcore_config)) {
config_(std::make_unique<VecIndexConfig>(segment_max_row_count,
field_index_meta,
segcore_config,
SegmentType::Growing)) {
index_ = std::make_unique<index::VectorMemIndex>(
config_->GetIndexType(),
config_->GetMetricType(),

View File

@ -245,7 +245,7 @@ class IndexingRecord {
for (auto& [field_id, field_meta] : schema_.get_fields()) {
++offset_id;
if (field_meta.is_vector() &&
segcore_config_.get_enable_growing_segment_index()) {
segcore_config_.get_enable_interim_segment_index()) {
// TODO: skip binary small index now, reenable after config.yaml is ready
if (field_meta.get_data_type() == DataType::VECTOR_BINARY) {
continue;

View File

@ -13,15 +13,15 @@
#include "log/Log.h"
namespace milvus::segcore {
VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout,
const FieldIndexMeta& index_meta_,
const SegcoreConfig& config)
const SegcoreConfig& config,
const SegmentType& segment_type)
: max_index_row_count_(max_index_row_cout), config_(config) {
origin_index_type_ = index_meta_.GetIndexType();
metric_type_ = index_meta_.GeMetricType();
index_type_ = support_index_types[0];
index_type_ = support_index_types.at(segment_type);
build_params_[knowhere::meta::METRIC_TYPE] = metric_type_;
build_params_[knowhere::indexparam::NLIST] =
std::to_string(config_.get_nlist());

View File

@ -16,6 +16,7 @@
#include "knowhere/config.h"
#include "SegcoreConfig.h"
#include "common/QueryInfo.h"
#include "common/type_c.h"
namespace milvus::segcore {
@ -27,8 +28,9 @@ enum class IndexConfigLevel {
};
class VecIndexConfig {
inline static const std::vector<std::string> support_index_types = {
knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC};
inline static const std::map<SegmentType, std::string> support_index_types =
{{SegmentType::Growing, knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC},
{SegmentType::Sealed, knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC}};
inline static const std::map<std::string, double> index_build_ratio = {
{knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC, 0.1}};
@ -39,7 +41,8 @@ class VecIndexConfig {
public:
VecIndexConfig(const int64_t max_index_row_count,
const FieldIndexMeta& index_meta_,
const SegcoreConfig& config);
const SegcoreConfig& config,
const SegmentType& segment_type);
int64_t
GetBuildThreshold() const noexcept;

View File

@ -64,20 +64,20 @@ class SegcoreConfig {
}
void
set_enable_growing_segment_index(bool enable_growing_segment_index) {
enable_growing_segment_index_ = enable_growing_segment_index;
set_enable_interim_segment_index(bool enable_interim_segment_index) {
this->enable_interim_segment_index_ = enable_interim_segment_index;
}
bool
get_enable_growing_segment_index() const {
return enable_growing_segment_index_;
get_enable_interim_segment_index() const {
return enable_interim_segment_index_;
}
private:
bool enable_growing_segment_index_ = false;
int64_t chunk_rows_ = 32 * 1024;
int64_t nlist_ = 100;
int64_t nprobe_ = 4;
inline static bool enable_interim_segment_index_ = false;
inline static int64_t chunk_rows_ = 32 * 1024;
inline static int64_t nlist_ = 100;
inline static int64_t nprobe_ = 4;
};
} // namespace milvus::segcore

View File

@ -121,7 +121,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
field_meta);
}
//insert vector data into index
if (segcore_config_.get_enable_growing_segment_index()) {
if (segcore_config_.get_enable_interim_segment_index()) {
indexing_record_.AppendingIndex(
reserved_offset,
num_rows,
@ -204,7 +204,7 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
insert_record_.get_field_data_base(field_id)->set_data_raw(
reserved_offset, field_data);
}
if (segcore_config_.get_enable_growing_segment_index()) {
if (segcore_config_.get_enable_interim_segment_index()) {
auto offset = reserved_offset;
for (auto& data : field_data) {
auto row_count = data->get_num_rows();

View File

@ -43,6 +43,7 @@
#include "storage/ChunkCacheSingleton.h"
#include "common/File.h"
#include "common/Tracer.h"
#include "index/VectorMemIndex.h"
namespace milvus::segcore {
@ -99,17 +100,19 @@ SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
") than other column's row count (" +
std::to_string(num_rows_.value()) + ")");
}
AssertInfo(!vector_indexings_.is_ready(field_id), "vec index is not ready");
if (get_bit(field_data_ready_bitset_, field_id)) {
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
} else if (get_bit(binlog_index_bitset_, field_id)) {
set_bit(binlog_index_bitset_, field_id, false);
vector_indexings_.drop_field_indexing(field_id);
}
update_row_count(row_count);
vector_indexings_.append_field_indexing(
field_id,
metric_type,
std::move(const_cast<LoadIndexInfo&>(info).index));
set_bit(index_ready_bitset_, field_id, true);
update_row_count(row_count);
// release field column
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
}
void
@ -370,11 +373,29 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
insert_record_.seal_pks();
}
std::unique_lock lck(mutex_);
set_bit(field_data_ready_bitset_, field_id, true);
bool use_temp_index = false;
{
// update num_rows to build temperate binlog index
std::unique_lock lck(mutex_);
update_row_count(num_rows);
}
if (generate_binlog_index(field_id)) {
std::unique_lock lck(mutex_);
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
use_temp_index = true;
}
if (!use_temp_index) {
std::unique_lock lck(mutex_);
set_bit(field_data_ready_bitset_, field_id, true);
}
}
{
std::unique_lock lck(mutex_);
update_row_count(num_rows);
}
std::unique_lock lck(mutex_);
update_row_count(num_rows);
}
void
@ -613,7 +634,26 @@ SegmentSealedImpl::vector_search(SearchInfo& search_info,
AssertInfo(field_meta.is_vector(),
"The meta type of vector field is not vector type");
if (get_bit(index_ready_bitset_, field_id)) {
if (get_bit(binlog_index_bitset_, field_id)) {
AssertInfo(
vec_binlog_config_.find(field_id) != vec_binlog_config_.end(),
"The binlog params is not generate.");
auto binlog_search_info =
vec_binlog_config_.at(field_id)->GetSearchConf(search_info);
AssertInfo(vector_indexings_.is_ready(field_id),
"vector indexes isn't ready for field " +
std::to_string(field_id.get()));
query::SearchOnSealedIndex(*schema_,
vector_indexings_,
binlog_search_info,
query_data,
query_count,
bitset,
output);
milvus::tracer::AddEvent(
"finish_searching_vector_temperate_binlog_index");
} else if (get_bit(index_ready_bitset_, field_id)) {
AssertInfo(vector_indexings_.is_ready(field_id),
"vector indexes isn't ready for field " +
std::to_string(field_id.get()));
@ -680,7 +720,8 @@ SegmentSealedImpl::get_vector(FieldId field_id,
auto& field_meta = schema_->operator[](field_id);
AssertInfo(field_meta.is_vector(), "vector field is not vector type");
if (!get_bit(index_ready_bitset_, field_id)) {
if (!get_bit(index_ready_bitset_, field_id) &&
!get_bit(binlog_index_bitset_, field_id)) {
return fill_with_empty(field_id, count);
}
@ -774,8 +815,14 @@ SegmentSealedImpl::DropFieldData(const FieldId field_id) {
} else {
auto& field_meta = schema_->operator[](field_id);
std::unique_lock lck(mutex_);
set_bit(field_data_ready_bitset_, field_id, false);
insert_record_.drop_field_data(field_id);
if (get_bit(field_data_ready_bitset_, field_id)) {
set_bit(field_data_ready_bitset_, field_id, false);
insert_record_.drop_field_data(field_id);
}
if (get_bit(binlog_index_bitset_, field_id)) {
set_bit(binlog_index_bitset_, field_id, false);
vector_indexings_.drop_field_indexing(field_id);
}
lck.unlock();
}
}
@ -810,7 +857,8 @@ SegmentSealedImpl::check_search(const query::Plan* plan) const {
}
auto& request_fields = plan->extra_info_opt_.value().involved_fields_;
auto field_ready_bitset = field_data_ready_bitset_ | index_ready_bitset_;
auto field_ready_bitset =
field_data_ready_bitset_ | index_ready_bitset_ | binlog_index_bitset_;
AssertInfo(request_fields.size() == field_ready_bitset.size(),
"Request fields size not equal to field ready bitset size when "
"check search");
@ -826,13 +874,19 @@ SegmentSealedImpl::check_search(const query::Plan* plan) const {
}
}
SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, int64_t segment_id)
: field_data_ready_bitset_(schema->size()),
SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id)
: segcore_config_(segcore_config),
field_data_ready_bitset_(schema->size()),
index_ready_bitset_(schema->size()),
binlog_index_bitset_(schema->size()),
scalar_indexings_(schema->size()),
insert_record_(*schema, MAX_ROW_COUNT),
schema_(schema),
id_(segment_id) {
id_(segment_id),
col_index_meta_(index_meta) {
}
SegmentSealedImpl::~SegmentSealedImpl() {
@ -1135,7 +1189,8 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
bool
SegmentSealedImpl::HasIndex(FieldId field_id) const {
std::shared_lock lck(mutex_);
return get_bit(index_ready_bitset_, field_id);
return get_bit(index_ready_bitset_, field_id) |
get_bit(binlog_index_bitset_, field_id);
}
bool
@ -1154,7 +1209,8 @@ SegmentSealedImpl::HasRawData(int64_t field_id) const {
auto fieldID = FieldId(field_id);
const auto& field_meta = schema_->operator[](fieldID);
if (datatype_is_vector(field_meta.get_data_type())) {
if (get_bit(index_ready_bitset_, fieldID)) {
if (get_bit(index_ready_bitset_, fieldID) |
get_bit(binlog_index_bitset_, fieldID)) {
AssertInfo(vector_indexings_.is_ready(fieldID),
"vector index is not ready");
auto field_indexing = vector_indexings_.get_field_indexing(fieldID);
@ -1303,4 +1359,63 @@ SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk,
bitset_chunk |= mask;
}
bool
SegmentSealedImpl::generate_binlog_index(const FieldId field_id) {
if (col_index_meta_ == nullptr)
return false;
auto& field_meta = schema_->operator[](field_id);
if (field_meta.is_vector() &&
field_meta.get_data_type() == DataType::VECTOR_FLOAT &&
segcore_config_.get_enable_interim_segment_index()) {
try {
auto& field_index_meta =
col_index_meta_->GetFieldIndexMeta(field_id);
auto& index_params = field_index_meta.GetIndexParams();
if (index_params.find(knowhere::meta::INDEX_TYPE) ==
index_params.end() ||
index_params.at(knowhere::meta::INDEX_TYPE) ==
knowhere::IndexEnum::INDEX_FAISS_IDMAP) {
return false;
}
// get binlog data and meta
auto row_count = num_rows_.value();
auto dim = field_meta.get_dim();
auto vec_data = fields_.at(field_id);
auto dataset =
knowhere::GenDataSet(row_count, dim, (void*)vec_data->Data());
dataset->SetIsOwner(false);
// generate index params
auto field_binlog_config = std::unique_ptr<VecIndexConfig>(
new VecIndexConfig(row_count,
field_index_meta,
segcore_config_,
SegmentType::Sealed));
auto build_config = field_binlog_config->GetBuildBaseParams();
build_config[knowhere::meta::DIM] = std::to_string(dim);
build_config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1);
auto index_metric = field_binlog_config->GetMetricType();
index::IndexBasePtr vec_index =
std::make_unique<index::VectorMemIndex>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());
vec_index->BuildWithDataset(dataset, build_config);
vector_indexings_.append_field_indexing(
field_id, index_metric, std::move(vec_index));
{
std::unique_lock lck(mutex_);
vec_binlog_config_[field_id] = std::move(field_binlog_config);
set_bit(binlog_index_bitset_, field_id, true);
}
return true;
} catch (std::exception& e) {
return false;
}
} else {
return false;
}
}
} // namespace milvus::segcore

View File

@ -34,12 +34,16 @@
#include "index/ScalarIndex.h"
#include "sys/mman.h"
#include "common/Types.h"
#include "common/IndexMeta.h"
namespace milvus::segcore {
class SegmentSealedImpl : public SegmentSealed {
public:
explicit SegmentSealedImpl(SchemaPtr schema, int64_t segment_id);
explicit SegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id);
~SegmentSealedImpl() override;
void
LoadIndex(const LoadIndexInfo& info) override;
@ -249,10 +253,14 @@ class SegmentSealedImpl : public SegmentSealed {
void
LoadScalarIndex(const LoadIndexInfo& info);
bool
generate_binlog_index(const FieldId field_id);
private:
// segment loading state
BitsetType field_data_ready_bitset_;
BitsetType index_ready_bitset_;
BitsetType binlog_index_bitset_;
std::atomic<int> system_ready_count_ = 0;
// segment data
@ -275,11 +283,22 @@ class SegmentSealedImpl : public SegmentSealed {
SchemaPtr schema_;
int64_t id_;
std::unordered_map<FieldId, std::shared_ptr<ColumnBase>> fields_;
// only useful in binlog
IndexMetaPtr col_index_meta_;
SegcoreConfig segcore_config_;
std::unordered_map<FieldId, std::unique_ptr<VecIndexConfig>>
vec_binlog_config_;
};
inline SegmentSealedPtr
CreateSealedSegment(SchemaPtr schema, int64_t segment_id = -1) {
return std::make_unique<SegmentSealedImpl>(schema, segment_id);
CreateSealedSegment(
SchemaPtr schema,
IndexMetaPtr index_meta = nullptr,
int64_t segment_id = -1,
const SegcoreConfig& segcore_config = SegcoreConfig::default_config()) {
return std::make_unique<SegmentSealedImpl>(
schema, index_meta, segcore_config, segment_id);
}
} // namespace milvus::segcore

View File

@ -32,10 +32,10 @@ SegcoreSetChunkRows(const int64_t value) {
}
extern "C" void
SegcoreSetEnableGrowingSegmentIndex(const bool value) {
SegcoreSetEnableInterimSegmentIndex(const bool value) {
milvus::segcore::SegcoreConfig& config =
milvus::segcore::SegcoreConfig::default_config();
config.set_enable_growing_segment_index(value);
config.set_enable_interim_segment_index(value);
}
extern "C" void

View File

@ -22,7 +22,7 @@ void
SegcoreSetChunkRows(const int64_t);
void
SegcoreSetEnableGrowingSegmentIndex(const bool);
SegcoreSetEnableInterimSegmentIndex(const bool);
void
SegcoreSetNlist(const int64_t);

View File

@ -41,8 +41,8 @@ NewSegment(CCollection collection, SegmentType seg_type, int64_t segment_id) {
}
case Sealed:
case Indexing:
segment = milvus::segcore::CreateSealedSegment(col->get_schema(),
segment_id);
segment = milvus::segcore::CreateSealedSegment(
col->get_schema(), col->GetIndexMeta(), segment_id);
break;
default:
LOG_SEGCORE_ERROR_ << "invalid segment type "

View File

@ -58,6 +58,7 @@ set(MILVUS_TEST_FILES
test_plan_proto.cpp
test_chunk_cache.cpp
test_storage.cpp
test_binlog_index.cpp
)
if ( BUILD_DISK_ANN STREQUAL "ON" )

View File

@ -75,7 +75,7 @@ Search_GrowingIndex(benchmark::State& state) {
FieldIndexMeta fieldIndexMeta(schema->get_field_id(FieldName("fakevec")),
std::move(index_params),
std::move(type_params));
segconf.set_enable_growing_segment_index(true);
segconf.set_enable_interim_segment_index(true);
std::map<FieldId, FieldIndexMeta> filedMap = {
{schema->get_field_id(FieldName("fakevec")), fieldIndexMeta}};
IndexMetaPtr metaPtr =

View File

@ -0,0 +1,326 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include <boost/format.hpp>
#include <regex>
#include "pb/plan.pb.h"
#include "segcore/segcore_init_c.h"
#include "segcore/SegmentSealed.h"
#include "segcore/SegmentSealedImpl.h"
#include "pb/schema.pb.h"
#include "test_utils/DataGen.h"
#include "index/IndexFactory.h"
#include "query/Plan.h"
#include "knowhere/comp/brute_force.h"
using namespace milvus::segcore;
using namespace milvus;
namespace pb = milvus::proto;
std::shared_ptr<float[]>
GenRandomFloatVecData(int rows, int dim, int seed = 42) {
std::shared_ptr<float[]> vecs =
std::shared_ptr<float[]>(new float[rows * dim]);
std::mt19937 rng(seed);
std::uniform_int_distribution<> distrib(0.0, 100.0);
for (int i = 0; i < rows * dim; ++i) vecs[i] = (float)distrib(rng);
return std::move(vecs);
}
inline float
GetKnnSearchRecall(
size_t nq, int64_t* gt_ids, size_t gt_k, int64_t* res_ids, size_t res_k) {
uint32_t matched_num = 0;
for (auto i = 0; i < nq; ++i) {
std::vector<int64_t> ids_0(gt_ids + i * gt_k,
gt_ids + i * gt_k + res_k);
std::vector<int64_t> ids_1(res_ids + i * res_k,
res_ids + i * res_k + res_k);
std::sort(ids_0.begin(), ids_0.end());
std::sort(ids_1.begin(), ids_1.end());
std::vector<int64_t> v(std::max(ids_0.size(), ids_1.size()));
std::vector<int64_t>::iterator it;
it = std::set_intersection(
ids_0.begin(), ids_0.end(), ids_1.begin(), ids_1.end(), v.begin());
v.resize(it - v.begin());
matched_num += v.size();
}
return ((float)matched_num) / ((float)nq * res_k);
}
using Param = const char*;
class BinlogIndexTest : public ::testing::TestWithParam<Param> {
void
SetUp() override {
auto param = GetParam();
metricType = param;
schema = std::make_shared<Schema>();
auto metric_type = metricType;
vec_field_id = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, data_d, metric_type);
auto i64_fid = schema->AddDebugField("counter", DataType::INT64);
schema->set_primary_field_id(i64_fid);
// generate vector field data
vec_data = GenRandomFloatVecData(data_n, data_d);
vec_field_data =
storage::CreateFieldData(DataType::VECTOR_FLOAT, data_d);
vec_field_data->FillFieldData(vec_data.get(), data_n);
}
public:
IndexMetaPtr
GetCollectionIndexMeta(std::string index_type) {
std::map<std::string, std::string> index_params = {
{"index_type", index_type},
{"metric_type", metricType},
{"nlist", "1024"}};
std::map<std::string, std::string> type_params = {{"dim", "128"}};
FieldIndexMeta fieldIndexMeta(
vec_field_id, std::move(index_params), std::move(type_params));
auto& config = SegcoreConfig::default_config();
config.set_chunk_rows(1024);
config.set_enable_interim_segment_index(true);
std::map<FieldId, FieldIndexMeta> filedMap = {
{vec_field_id, fieldIndexMeta}};
IndexMetaPtr metaPtr =
std::make_shared<CollectionIndexMeta>(226985, std::move(filedMap));
return std::move(metaPtr);
}
void
LoadOtherFields() {
auto dataset = DataGen(schema, data_n);
// load id
LoadFieldDataInfo row_id_info;
FieldMeta row_id_field_meta(
FieldName("RowID"), RowFieldID, DataType::INT64);
auto field_data = std::make_shared<milvus::storage::FieldData<int64_t>>(
DataType::INT64);
field_data->FillFieldData(dataset.row_ids_.data(), data_n);
auto field_data_info =
FieldDataInfo{RowFieldID.get(),
data_n,
std::vector<storage::FieldDataPtr>{field_data}};
segment->LoadFieldData(RowFieldID, field_data_info);
// load ts
LoadFieldDataInfo ts_info;
FieldMeta ts_field_meta(
FieldName("Timestamp"), TimestampFieldID, DataType::INT64);
field_data = std::make_shared<milvus::storage::FieldData<int64_t>>(
DataType::INT64);
field_data->FillFieldData(dataset.timestamps_.data(), data_n);
field_data_info =
FieldDataInfo{TimestampFieldID.get(),
data_n,
std::vector<storage::FieldDataPtr>{field_data}};
segment->LoadFieldData(TimestampFieldID, field_data_info);
}
protected:
milvus::SchemaPtr schema;
const char* metricType;
size_t data_n = 10000;
size_t data_d = 128;
size_t topk = 10;
milvus::storage::FieldDataPtr vec_field_data = nullptr;
milvus::segcore::SegmentSealedPtr segment = nullptr;
milvus::FieldId vec_field_id;
std::shared_ptr<float[]> vec_data;
};
INSTANTIATE_TEST_CASE_P(MetricTypeParameters,
BinlogIndexTest,
::testing::Values(knowhere::metric::L2));
TEST_P(BinlogIndexTest, Accuracy) {
IndexMetaPtr collection_index_meta =
GetCollectionIndexMeta(knowhere::IndexEnum::INDEX_FAISS_IVFFLAT);
segment = CreateSealedSegment(schema, collection_index_meta);
LoadOtherFields();
auto& segcore_config = milvus::segcore::SegcoreConfig::default_config();
segcore_config.set_enable_interim_segment_index(true);
segcore_config.set_nprobe(32);
// 1. load field data, and build binlog index for binlog data
auto field_data_info =
FieldDataInfo{vec_field_id.get(),
data_n,
std::vector<storage::FieldDataPtr>{vec_field_data}};
segment->LoadFieldData(vec_field_id, field_data_info);
//assert segment has been built binlog index
EXPECT_TRUE(segment->HasIndex(vec_field_id));
EXPECT_EQ(segment->get_row_count(), data_n);
EXPECT_FALSE(segment->HasFieldData(vec_field_id));
// 2. search binlog index
auto num_queries = 10;
auto query_ptr = GenRandomFloatVecData(num_queries, data_d);
milvus::proto::plan::PlanNode plan_node;
auto vector_anns = plan_node.mutable_vector_anns();
vector_anns->set_vector_type(milvus::proto::plan::VectorType::FloatVector);
vector_anns->set_placeholder_tag("$0");
vector_anns->set_field_id(vec_field_id.get());
auto query_info = vector_anns->mutable_query_info();
query_info->set_topk(topk);
query_info->set_round_decimal(3);
query_info->set_metric_type(metricType);
query_info->set_search_params(R"({"nprobe": 1024})");
auto plan_str = plan_node.SerializeAsString();
auto ph_group_raw =
CreatePlaceholderGroupFromBlob(num_queries, data_d, query_ptr.get());
auto plan = milvus::query::CreateSearchPlanByExpr(
*schema, plan_str.data(), plan_str.size());
auto ph_group =
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
std::vector<const milvus::query::PlaceholderGroup*> ph_group_arr = {
ph_group.get()};
auto nlist = segcore_config.get_nlist();
auto binlog_index_sr = segment->Search(plan.get(), ph_group.get());
ASSERT_EQ(binlog_index_sr->total_nq_, num_queries);
EXPECT_EQ(binlog_index_sr->unity_topK_, topk);
EXPECT_EQ(binlog_index_sr->distances_.size(), num_queries * topk);
EXPECT_EQ(binlog_index_sr->seg_offsets_.size(), num_queries * topk);
// 3. update vector index
{
milvus::index::CreateIndexInfo create_index_info;
create_index_info.field_type = DataType::VECTOR_FLOAT;
create_index_info.metric_type = metricType;
create_index_info.index_type = knowhere::IndexEnum::INDEX_FAISS_IVFFLAT;
create_index_info.index_engine_version =
knowhere::Version::GetCurrentVersion().VersionNumber();
auto indexing = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, milvus::storage::FileManagerContext());
auto build_conf =
knowhere::Json{{knowhere::meta::METRIC_TYPE, metricType},
{knowhere::meta::DIM, std::to_string(data_d)},
{knowhere::indexparam::NLIST, "1024"}};
auto database = knowhere::GenDataSet(data_n, data_d, vec_data.get());
indexing->BuildWithDataset(database, build_conf);
LoadIndexInfo load_info;
load_info.field_id = vec_field_id.get();
load_info.index = std::move(indexing);
load_info.index_params["metric_type"] = metricType;
segment->DropFieldData(vec_field_id);
ASSERT_NO_THROW(segment->LoadIndex(load_info));
EXPECT_TRUE(segment->HasIndex(vec_field_id));
EXPECT_EQ(segment->get_row_count(), data_n);
EXPECT_FALSE(segment->HasFieldData(vec_field_id));
auto ivf_sr = segment->Search(plan.get(), ph_group.get());
auto similary = GetKnnSearchRecall(num_queries,
binlog_index_sr->seg_offsets_.data(),
topk,
ivf_sr->seg_offsets_.data(),
topk);
ASSERT_GT(similary, 0.45);
}
}
TEST_P(BinlogIndexTest, DisableInterimIndex) {
IndexMetaPtr collection_index_meta =
GetCollectionIndexMeta(knowhere::IndexEnum::INDEX_FAISS_IVFFLAT);
segment = CreateSealedSegment(schema, collection_index_meta);
LoadOtherFields();
SegcoreSetEnableInterimSegmentIndex(false);
auto field_data_info =
FieldDataInfo{vec_field_id.get(),
data_n,
std::vector<storage::FieldDataPtr>{vec_field_data}};
segment->LoadFieldData(vec_field_id, field_data_info);
EXPECT_FALSE(segment->HasIndex(vec_field_id));
EXPECT_EQ(segment->get_row_count(), data_n);
EXPECT_TRUE(segment->HasFieldData(vec_field_id));
// load vector index
milvus::index::CreateIndexInfo create_index_info;
create_index_info.field_type = DataType::VECTOR_FLOAT;
create_index_info.metric_type = metricType;
create_index_info.index_type = knowhere::IndexEnum::INDEX_FAISS_IVFFLAT;
create_index_info.index_engine_version =
knowhere::Version::GetCurrentVersion().VersionNumber();
auto indexing = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, milvus::storage::FileManagerContext());
auto build_conf =
knowhere::Json{{knowhere::meta::METRIC_TYPE, metricType},
{knowhere::meta::DIM, std::to_string(data_d)},
{knowhere::indexparam::NLIST, "1024"}};
auto database = knowhere::GenDataSet(data_n, data_d, vec_data.get());
indexing->BuildWithDataset(database, build_conf);
LoadIndexInfo load_info;
load_info.field_id = vec_field_id.get();
load_info.index = std::move(indexing);
load_info.index_params["metric_type"] = metricType;
segment->DropFieldData(vec_field_id);
ASSERT_NO_THROW(segment->LoadIndex(load_info));
EXPECT_TRUE(segment->HasIndex(vec_field_id));
EXPECT_EQ(segment->get_row_count(), data_n);
EXPECT_FALSE(segment->HasFieldData(vec_field_id));
}
TEST_P(BinlogIndexTest, LoadBingLogWihIDMAP) {
IndexMetaPtr collection_index_meta =
GetCollectionIndexMeta(knowhere::IndexEnum::INDEX_FAISS_IDMAP);
segment = CreateSealedSegment(schema, collection_index_meta);
LoadOtherFields();
auto field_data_info =
FieldDataInfo{vec_field_id.get(),
data_n,
std::vector<storage::FieldDataPtr>{vec_field_data}};
segment->LoadFieldData(vec_field_id, field_data_info);
EXPECT_FALSE(segment->HasIndex(vec_field_id));
EXPECT_EQ(segment->get_row_count(), data_n);
EXPECT_TRUE(segment->HasFieldData(vec_field_id));
}
TEST_P(BinlogIndexTest, LoadBinlogWithoutIndexMeta) {
IndexMetaPtr collection_index_meta =
GetCollectionIndexMeta(knowhere::IndexEnum::INDEX_FAISS_IDMAP);
segment = CreateSealedSegment(schema, collection_index_meta);
SegcoreSetEnableInterimSegmentIndex(true);
auto field_data_info =
FieldDataInfo{vec_field_id.get(),
data_n,
std::vector<storage::FieldDataPtr>{vec_field_data}};
segment->LoadFieldData(vec_field_id, field_data_info);
EXPECT_FALSE(segment->HasIndex(vec_field_id));
EXPECT_EQ(segment->get_row_count(), data_n);
EXPECT_TRUE(segment->HasFieldData(vec_field_id));
}

View File

@ -177,7 +177,7 @@ TEST(Float16, GetVector) {
vec, std::move(index_params), std::move(type_params));
auto config = SegcoreConfig::default_config();
config.set_chunk_rows(1024);
config.set_enable_growing_segment_index(true);
config.set_enable_interim_segment_index(true);
std::map<FieldId, FieldIndexMeta> filedMap = {{vec, fieldIndexMeta}};
IndexMetaPtr metaPtr =
std::make_shared<CollectionIndexMeta>(100000, std::move(filedMap));

View File

@ -134,7 +134,7 @@ TEST(Growing, FillData) {
vec, std::move(index_params), std::move(type_params));
auto config = SegcoreConfig::default_config();
config.set_chunk_rows(1024);
config.set_enable_growing_segment_index(true);
config.set_enable_interim_segment_index(true);
std::map<FieldId, FieldIndexMeta> filedMap = {{vec, fieldIndexMeta}};
IndexMetaPtr metaPtr =
std::make_shared<CollectionIndexMeta>(100000, std::move(filedMap));

View File

@ -37,7 +37,7 @@ TEST(GrowingIndex, Correctness) {
vec, std::move(index_params), std::move(type_params));
auto& config = SegcoreConfig::default_config();
config.set_chunk_rows(1024);
config.set_enable_growing_segment_index(true);
config.set_enable_interim_segment_index(true);
std::map<FieldId, FieldIndexMeta> filedMap = {{vec, fieldIndexMeta}};
IndexMetaPtr metaPtr =
std::make_shared<CollectionIndexMeta>(226985, std::move(filedMap));
@ -135,7 +135,7 @@ TEST(GrowingIndex, MissIndexMeta) {
auto& config = SegcoreConfig::default_config();
config.set_chunk_rows(1024);
config.set_enable_growing_segment_index(true);
config.set_enable_interim_segment_index(true);
auto segment = CreateGrowingSegment(schema, nullptr);
}
@ -175,7 +175,7 @@ TEST_P(GrowingIndexGetVectorTest, GetVector) {
vec, std::move(index_params), std::move(type_params));
auto& config = SegcoreConfig::default_config();
config.set_chunk_rows(1024);
config.set_enable_growing_segment_index(true);
config.set_enable_interim_segment_index(true);
std::map<FieldId, FieldIndexMeta> filedMap = {{vec, fieldIndexMeta}};
IndexMetaPtr metaPtr =
std::make_shared<CollectionIndexMeta>(100000, std::move(filedMap));

View File

@ -237,6 +237,7 @@ func (BinaryExpr_BinaryOp) EnumDescriptor() ([]byte, []int) {
type GenericValue struct {
// Types that are valid to be assigned to Val:
//
// *GenericValue_BoolVal
// *GenericValue_Int64Val
// *GenericValue_FloatVal
@ -1297,6 +1298,7 @@ var xxx_messageInfo_AlwaysTrueExpr proto.InternalMessageInfo
type Expr struct {
// Types that are valid to be assigned to Expr:
//
// *Expr_TermExpr
// *Expr_UnaryExpr
// *Expr_BinaryExpr
@ -1668,6 +1670,7 @@ func (m *QueryPlanNode) GetLimit() int64 {
type PlanNode struct {
// Types that are valid to be assigned to Node:
//
// *PlanNode_VectorAnns
// *PlanNode_Predicates
// *PlanNode_Query

View File

@ -948,6 +948,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
predictDiskUsage += uint64(getBinlogDataSize(fieldBinlog))
} else {
predictMemUsage += uint64(getBinlogDataSize(fieldBinlog))
enableBinlogIndex := paramtable.Get().QueryNodeCfg.EnableInterimSegmentIndex.GetAsBool()
if enableBinlogIndex {
buildBinlogIndexRate := paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat()
predictMemUsage += uint64(float32(getBinlogDataSize(fieldBinlog)) * float32(buildBinlogIndexRate))
}
}
}
}

View File

@ -197,13 +197,13 @@ func (node *QueryNode) InitSegcore() error {
cKnowhereThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereThreadPoolSize.GetAsUint32())
C.SegcoreSetKnowhereSearchThreadPoolNum(cKnowhereThreadPoolSize)
enableGrowingIndex := C.bool(paramtable.Get().QueryNodeCfg.EnableGrowingSegmentIndex.GetAsBool())
C.SegcoreSetEnableGrowingSegmentIndex(enableGrowingIndex)
enableInterimIndex := C.bool(paramtable.Get().QueryNodeCfg.EnableInterimSegmentIndex.GetAsBool())
C.SegcoreSetEnableInterimSegmentIndex(enableInterimIndex)
nlist := C.int64_t(paramtable.Get().QueryNodeCfg.GrowingIndexNlist.GetAsInt64())
nlist := C.int64_t(paramtable.Get().QueryNodeCfg.InterimIndexNlist.GetAsInt64())
C.SegcoreSetNlist(nlist)
nprobe := C.int64_t(paramtable.Get().QueryNodeCfg.GrowingIndexNProbe.GetAsInt64())
nprobe := C.int64_t(paramtable.Get().QueryNodeCfg.InterimIndexNProbe.GetAsInt64())
C.SegcoreSetNprobe(nprobe)
// override segcore SIMD type

View File

@ -485,6 +485,21 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Failed() {
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
}
func (suite *ServiceSuite) genSegmentIndexInfos(loadInfo []*querypb.SegmentLoadInfo) []*indexpb.IndexInfo {
indexInfoList := make([]*indexpb.IndexInfo, 0)
seg0LoadInfo := loadInfo[0]
fieldIndexInfos := seg0LoadInfo.IndexInfos
for _, info := range fieldIndexInfos {
indexInfoList = append(indexInfoList, &indexpb.IndexInfo{
CollectionID: suite.collectionID,
FieldID: info.GetFieldID(),
IndexName: info.GetIndexName(),
IndexParams: info.GetIndexParams(),
})
}
return indexInfoList
}
func (suite *ServiceSuite) genSegmentLoadInfos(schema *schemapb.CollectionSchema) []*querypb.SegmentLoadInfo {
ctx := context.Background()
@ -665,6 +680,8 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
info.IndexInfos = nil
return info
})
// generate indexinfos for setting index meta.
indexInfoList := suite.genSegmentIndexInfos(infos)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
@ -676,7 +693,7 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
Schema: schema,
NeedTransfer: false,
LoadScope: querypb.LoadScope_Full,
IndexInfoList: []*indexpb.IndexInfo{{}},
IndexInfoList: indexInfoList,
}
// Load segment

View File

@ -1592,9 +1592,10 @@ type queryNodeConfig struct {
// segcore
KnowhereThreadPoolSize ParamItem `refreshable:"false"`
ChunkRows ParamItem `refreshable:"false"`
EnableGrowingSegmentIndex ParamItem `refreshable:"false"`
GrowingIndexNlist ParamItem `refreshable:"false"`
GrowingIndexNProbe ParamItem `refreshable:"false"`
EnableInterimSegmentIndex ParamItem `refreshable:"false"`
InterimIndexNlist ParamItem `refreshable:"false"`
InterimIndexNProbe ParamItem `refreshable:"false"`
InterimIndexMemExpandRate ParamItem `refreshable:"false"`
// memory limit
LoadMemoryUsageFactor ParamItem `refreshable:"true"`
@ -1715,42 +1716,51 @@ func (p *queryNodeConfig) init(base *BaseTable) {
}
p.ChunkRows.Init(base.mgr)
p.EnableGrowingSegmentIndex = ParamItem{
Key: "queryNode.segcore.growing.enableIndex",
p.EnableInterimSegmentIndex = ParamItem{
Key: "queryNode.segcore.interimIndex.enableIndex",
Version: "2.0.0",
DefaultValue: "false",
Doc: "Enable segment growing with index to accelerate vector search.",
Doc: "Enable segment build with index to accelerate vector search when segment is in growing or binlog.",
Export: true,
}
p.EnableGrowingSegmentIndex.Init(base.mgr)
p.EnableInterimSegmentIndex.Init(base.mgr)
p.GrowingIndexNlist = ParamItem{
Key: "queryNode.segcore.growing.nlist",
p.InterimIndexNlist = ParamItem{
Key: "queryNode.segcore.interimIndex.nlist",
Version: "2.0.0",
DefaultValue: "128",
Doc: "growing index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8",
Doc: "temp index nlist, recommend to set sqrt(chunkRows), must smaller than chunkRows/8",
Export: true,
}
p.GrowingIndexNlist.Init(base.mgr)
p.InterimIndexNlist.Init(base.mgr)
p.GrowingIndexNProbe = ParamItem{
Key: "queryNode.segcore.growing.nprobe",
p.InterimIndexMemExpandRate = ParamItem{
Key: "queryNode.segcore.interimIndex.memExpansionRate",
Version: "2.0.0",
DefaultValue: "1.15",
Doc: "extra memory needed by building interim index",
Export: true,
}
p.InterimIndexMemExpandRate.Init(base.mgr)
p.InterimIndexNProbe = ParamItem{
Key: "queryNode.segcore.interimIndex.nprobe",
Version: "2.0.0",
Formatter: func(v string) string {
defaultNprobe := p.GrowingIndexNlist.GetAsInt64() / 8
defaultNprobe := p.InterimIndexNlist.GetAsInt64() / 8
nprobe := getAsInt64(v)
if nprobe == 0 {
nprobe = defaultNprobe
}
if nprobe > p.GrowingIndexNlist.GetAsInt64() {
return p.GrowingIndexNlist.GetValue()
if nprobe > p.InterimIndexNlist.GetAsInt64() {
return p.InterimIndexNlist.GetValue()
}
return strconv.FormatInt(nprobe, 10)
},
Doc: "nprobe to search small index, based on your accuracy requirement, must smaller than nlist",
Export: true,
}
p.GrowingIndexNProbe.Init(base.mgr)
p.InterimIndexNProbe.Init(base.mgr)
p.LoadMemoryUsageFactor = ParamItem{
Key: "queryNode.loadMemoryUsageFactor",

View File

@ -299,10 +299,10 @@ func TestComponentParam(t *testing.T) {
chunkRows := Params.ChunkRows.GetAsInt64()
assert.Equal(t, int64(1024), chunkRows)
nlist := Params.GrowingIndexNlist.GetAsInt64()
nlist := Params.InterimIndexNlist.GetAsInt64()
assert.Equal(t, int64(128), nlist)
nprobe := Params.GrowingIndexNProbe.GetAsInt64()
nprobe := Params.InterimIndexNProbe.GetAsInt64()
assert.Equal(t, int64(16), nprobe)
assert.Equal(t, true, Params.GroupEnabled.GetAsBool())
@ -321,17 +321,17 @@ func TestComponentParam(t *testing.T) {
chunkRows = Params.ChunkRows.GetAsInt64()
assert.Equal(t, int64(8192), chunkRows)
enableGrowingIndex := Params.EnableGrowingSegmentIndex.GetAsBool()
assert.Equal(t, true, enableGrowingIndex)
enableInterimIndex := Params.EnableInterimSegmentIndex.GetAsBool()
assert.Equal(t, true, enableInterimIndex)
params.Save("queryNode.segcore.growing.enableIndex", "true")
enableGrowingIndex = Params.EnableGrowingSegmentIndex.GetAsBool()
assert.Equal(t, true, enableGrowingIndex)
params.Save("queryNode.segcore.interimIndex.enableIndex", "true")
enableInterimIndex = Params.EnableInterimSegmentIndex.GetAsBool()
assert.Equal(t, true, enableInterimIndex)
nlist = Params.GrowingIndexNlist.GetAsInt64()
nlist = Params.InterimIndexNlist.GetAsInt64()
assert.Equal(t, int64(128), nlist)
nprobe = Params.GrowingIndexNProbe.GetAsInt64()
nprobe = Params.InterimIndexNProbe.GetAsInt64()
assert.Equal(t, int64(16), nprobe)
params.Remove("queryNode.segcore.growing.nlist")

View File

@ -2986,7 +2986,7 @@ class TestCollectionSearch(TestcaseBase):
filter_ids.append(_id)
# 2. create index
index_param = {"index_type": "IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 100}}
index_param = {"index_type": "FLAT", "metric_type": "COSINE", "params": {}}
collection_w.create_index("float_vector", index_param)
collection_w.load()
@ -7379,7 +7379,7 @@ class TestCollectionRangeSearch(TestcaseBase):
filter_ids.append(_id)
# 2. create index
index_param = {"index_type": "IVF_FLAT",
index_param = {"index_type": "FLAT",
"metric_type": "L2", "params": {"nlist": 100}}
collection_w.create_index("float_vector", index_param)
collection_w.load()