mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: [StorageV2][AddField] Handle lack binlog rows in storage v2 (#42186)
Related to #39173 #39718 In storage v2, the `lack_bin_rows` cannot be used since field id is not column group id, which will not be matched forever. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
297331b2cc
commit
cc42d49769
@ -95,6 +95,7 @@ const std::string PARTITION_KEY_ISOLATION_KEY = "partition_key_isolation";
|
||||
const std::string STORAGE_VERSION_KEY = "storage_version";
|
||||
const std::string DIM_KEY = "dim";
|
||||
const std::string DATA_TYPE_KEY = "data_type";
|
||||
const std::string INDEX_NUM_ROWS_KEY = "index_num_rows";
|
||||
|
||||
// storage version
|
||||
const int64_t STORAGE_V1 = 1;
|
||||
|
||||
@ -71,27 +71,9 @@ BitmapIndex<T>::Build(const Config& config) {
|
||||
if (is_built_) {
|
||||
return;
|
||||
}
|
||||
auto field_datas = file_manager_->CacheRawDataToMemory(config);
|
||||
|
||||
auto lack_binlog_rows =
|
||||
GetValueFromConfig<int64_t>(config, "lack_binlog_rows");
|
||||
if (lack_binlog_rows.has_value() && lack_binlog_rows.value() > 0) {
|
||||
auto field_schema = file_manager_->GetFieldDataMeta().field_schema;
|
||||
auto default_value = [&]() -> std::optional<DefaultValueType> {
|
||||
if (!field_schema.has_default_value()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return field_schema.default_value();
|
||||
}();
|
||||
auto field_data = storage::CreateFieldData(
|
||||
static_cast<DataType>(field_schema.data_type()),
|
||||
true,
|
||||
1,
|
||||
lack_binlog_rows.value());
|
||||
field_data->FillFieldData(default_value, lack_binlog_rows.value());
|
||||
field_datas.insert(field_datas.begin(), field_data);
|
||||
}
|
||||
|
||||
auto field_datas =
|
||||
storage::CacheRawDataAndFillMissing(file_manager_, config);
|
||||
BuildWithFieldData(field_datas);
|
||||
}
|
||||
|
||||
|
||||
@ -244,26 +244,8 @@ HybridScalarIndex<T>::Build(const Config& config) {
|
||||
GetBitmapCardinalityLimitFromConfig(config);
|
||||
LOG_INFO("config bitmap cardinality limit to {}",
|
||||
bitmap_index_cardinality_limit_);
|
||||
auto field_datas = mem_file_manager_->CacheRawDataToMemory(config);
|
||||
|
||||
auto lack_binlog_rows =
|
||||
GetValueFromConfig<int64_t>(config, "lack_binlog_rows");
|
||||
if (lack_binlog_rows.has_value() && lack_binlog_rows.value() > 0) {
|
||||
auto field_schema = mem_file_manager_->GetFieldDataMeta().field_schema;
|
||||
auto default_value = [&]() -> std::optional<DefaultValueType> {
|
||||
if (!field_schema.has_default_value()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return field_schema.default_value();
|
||||
}();
|
||||
auto field_data = storage::CreateFieldData(
|
||||
static_cast<DataType>(field_schema.data_type()),
|
||||
true,
|
||||
1,
|
||||
lack_binlog_rows.value());
|
||||
field_data->FillFieldData(default_value, lack_binlog_rows.value());
|
||||
field_datas.insert(field_datas.begin(), field_data);
|
||||
}
|
||||
auto field_datas =
|
||||
storage::CacheRawDataAndFillMissing(mem_file_manager_, config);
|
||||
|
||||
SelectIndexBuildType(field_datas);
|
||||
BuildInternal(field_datas);
|
||||
|
||||
@ -165,25 +165,8 @@ InvertedIndexTantivy<T>::Upload(const Config& config) {
|
||||
template <typename T>
|
||||
void
|
||||
InvertedIndexTantivy<T>::Build(const Config& config) {
|
||||
auto field_datas = mem_file_manager_->CacheRawDataToMemory(config);
|
||||
auto lack_binlog_rows =
|
||||
GetValueFromConfig<int64_t>(config, "lack_binlog_rows");
|
||||
if (lack_binlog_rows.has_value() && lack_binlog_rows.value() > 0) {
|
||||
auto field_schema = mem_file_manager_->GetFieldDataMeta().field_schema;
|
||||
auto default_value = [&]() -> std::optional<DefaultValueType> {
|
||||
if (!field_schema.has_default_value()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return field_schema.default_value();
|
||||
}();
|
||||
auto field_data = storage::CreateFieldData(
|
||||
static_cast<DataType>(field_schema.data_type()),
|
||||
true,
|
||||
1,
|
||||
lack_binlog_rows.value());
|
||||
field_data->FillFieldData(default_value, lack_binlog_rows.value());
|
||||
field_datas.insert(field_datas.begin(), field_data);
|
||||
}
|
||||
auto field_datas =
|
||||
storage::CacheRawDataAndFillMissing(mem_file_manager_, config);
|
||||
BuildWithFieldData(field_datas);
|
||||
}
|
||||
|
||||
|
||||
@ -51,26 +51,8 @@ ScalarIndexSort<T>::Build(const Config& config) {
|
||||
if (is_built_) {
|
||||
return;
|
||||
}
|
||||
auto field_datas = file_manager_->CacheRawDataToMemory(config);
|
||||
|
||||
auto lack_binlog_rows =
|
||||
GetValueFromConfig<int64_t>(config, "lack_binlog_rows");
|
||||
if (lack_binlog_rows.has_value() && lack_binlog_rows.value() > 0) {
|
||||
auto field_schema = file_manager_->GetFieldDataMeta().field_schema;
|
||||
auto default_value = [&]() -> std::optional<DefaultValueType> {
|
||||
if (!field_schema.has_default_value()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return field_schema.default_value();
|
||||
}();
|
||||
auto field_data = storage::CreateFieldData(
|
||||
static_cast<DataType>(field_schema.data_type()),
|
||||
true,
|
||||
1,
|
||||
lack_binlog_rows.value());
|
||||
field_data->FillFieldData(default_value, lack_binlog_rows.value());
|
||||
field_datas.insert(field_datas.begin(), field_data);
|
||||
}
|
||||
auto field_datas =
|
||||
storage::CacheRawDataAndFillMissing(file_manager_, config);
|
||||
|
||||
BuildWithFieldData(field_datas);
|
||||
}
|
||||
|
||||
@ -65,26 +65,8 @@ StringIndexMarisa::Build(const Config& config) {
|
||||
if (built_) {
|
||||
PanicInfo(IndexAlreadyBuild, "index has been built");
|
||||
}
|
||||
auto field_datas = file_manager_->CacheRawDataToMemory(config);
|
||||
|
||||
auto lack_binlog_rows =
|
||||
GetValueFromConfig<int64_t>(config, "lack_binlog_rows");
|
||||
if (lack_binlog_rows.has_value() && lack_binlog_rows.value() > 0) {
|
||||
auto field_schema = file_manager_->GetFieldDataMeta().field_schema;
|
||||
auto default_value = [&]() -> std::optional<DefaultValueType> {
|
||||
if (!field_schema.has_default_value()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return field_schema.default_value();
|
||||
}();
|
||||
auto field_data = storage::CreateFieldData(
|
||||
static_cast<DataType>(field_schema.data_type()),
|
||||
true,
|
||||
1,
|
||||
lack_binlog_rows.value());
|
||||
field_data->FillFieldData(default_value, lack_binlog_rows.value());
|
||||
field_datas.insert(field_datas.begin(), field_data);
|
||||
}
|
||||
auto field_datas =
|
||||
storage::CacheRawDataAndFillMissing(file_manager_, config);
|
||||
|
||||
BuildWithFieldData(field_datas);
|
||||
}
|
||||
|
||||
@ -164,10 +164,10 @@ get_config(std::unique_ptr<milvus::proto::indexcgo::BuildIndexInfo>& info) {
|
||||
if (info->opt_fields().size()) {
|
||||
config[VEC_OPT_FIELDS] = get_opt_field(info->opt_fields());
|
||||
}
|
||||
config["lack_binlog_rows"] = info->lack_binlog_rows();
|
||||
if (info->partition_key_isolation()) {
|
||||
config[PARTITION_KEY_ISOLATION_KEY] = info->partition_key_isolation();
|
||||
}
|
||||
config[INDEX_NUM_ROWS_KEY] = info->num_rows();
|
||||
config[STORAGE_VERSION_KEY] = info->storage_version();
|
||||
if (info->storage_version() == STORAGE_V2) {
|
||||
config[SEGMENT_INSERT_FILES_KEY] =
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
#include "arrow/scalar.h"
|
||||
#include "arrow/type_fwd.h"
|
||||
#include "fmt/format.h"
|
||||
#include "index/Utils.h"
|
||||
#include "log/Log.h"
|
||||
|
||||
#include "common/Consts.h"
|
||||
@ -1026,8 +1027,7 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
|
||||
// remote files might not followed the sequence of column group id,
|
||||
// so we need to put into map<column_group_id, remote_chunk_files>
|
||||
std::unordered_map<int64_t, std::vector<std::string>> column_group_files;
|
||||
for (int i = 0; i < remote_files.size(); i++) {
|
||||
auto& remote_chunk_files = remote_files[i];
|
||||
for (auto& remote_chunk_files : remote_files) {
|
||||
AssertInfo(remote_chunk_files.size() > 0, "remote files size is 0");
|
||||
|
||||
// find second last of / to get group_id
|
||||
@ -1127,4 +1127,44 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
|
||||
return field_data_list;
|
||||
}
|
||||
|
||||
std::vector<FieldDataPtr>
|
||||
CacheRawDataAndFillMissing(const MemFileManagerImplPtr& file_manager,
|
||||
const Config& config) {
|
||||
// download field data
|
||||
auto field_datas = file_manager->CacheRawDataToMemory(config);
|
||||
|
||||
// check storage version
|
||||
auto storage_version =
|
||||
index::GetValueFromConfig<int64_t>(config, STORAGE_VERSION_KEY)
|
||||
.value_or(0);
|
||||
|
||||
int64_t lack_binlog_rows =
|
||||
index::GetValueFromConfig<int64_t>(config, INDEX_NUM_ROWS_KEY)
|
||||
.value_or(0);
|
||||
for (auto& field_data : field_datas) {
|
||||
lack_binlog_rows -= field_data->get_num_rows();
|
||||
}
|
||||
|
||||
if (lack_binlog_rows > 0) {
|
||||
LOG_INFO("create index lack binlog detected, lock row num: {}",
|
||||
lack_binlog_rows);
|
||||
auto field_schema = file_manager->GetFieldDataMeta().field_schema;
|
||||
auto default_value = [&]() -> std::optional<DefaultValueType> {
|
||||
if (!field_schema.has_default_value()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return field_schema.default_value();
|
||||
}();
|
||||
auto field_data = storage::CreateFieldData(
|
||||
static_cast<DataType>(field_schema.data_type()),
|
||||
true,
|
||||
1,
|
||||
lack_binlog_rows);
|
||||
field_data->FillFieldData(default_value, lack_binlog_rows);
|
||||
field_datas.insert(field_datas.begin(), field_data);
|
||||
}
|
||||
|
||||
return field_datas;
|
||||
}
|
||||
|
||||
} // namespace milvus::storage
|
||||
|
||||
@ -26,6 +26,7 @@
|
||||
#include "knowhere/comp/index_param.h"
|
||||
#include "parquet/schema.h"
|
||||
#include "storage/Event.h"
|
||||
#include "storage/MemFileManagerImpl.h"
|
||||
#include "storage/PayloadStream.h"
|
||||
#include "storage/FileManager.h"
|
||||
#include "storage/BinlogReader.h"
|
||||
@ -233,6 +234,10 @@ SortByPath(std::vector<std::pair<std::string, int64_t>>& paths) {
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<FieldDataPtr>
|
||||
CacheRawDataAndFillMissing(const MemFileManagerImplPtr& file_manager,
|
||||
const Config& config);
|
||||
|
||||
// used only for test
|
||||
inline std::shared_ptr<ArrowDataWrapper>
|
||||
ConvertFieldDataToArrowDataWrapper(const FieldDataPtr& field_data) {
|
||||
|
||||
@ -231,8 +231,9 @@ class ArrayBitmapIndexTest : public testing::Test {
|
||||
config["index_type"] = milvus::index::HYBRID_INDEX_TYPE;
|
||||
config[INSERT_FILES_KEY] = std::vector<std::string>{log_path};
|
||||
config["bitmap_cardinality_limit"] = "100";
|
||||
config[INDEX_NUM_ROWS_KEY] = nb_;
|
||||
if (has_lack_binlog_row_) {
|
||||
config["lack_binlog_rows"] = lack_binlog_row_;
|
||||
config[INDEX_NUM_ROWS_KEY] = nb_ + lack_binlog_row_;
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@ -288,7 +288,8 @@ TEST_P(BinlogIndexTest, AccuracyWithLoadFieldData) {
|
||||
EXPECT_TRUE(segment->HasIndex(vec_field_id));
|
||||
EXPECT_EQ(segment->get_row_count(), data_n);
|
||||
// only INDEX_FAISS_IVFFLAT has raw data, thus it should release the raw field data.
|
||||
EXPECT_EQ(segment->HasFieldData(vec_field_id), index_type != knowhere::IndexEnum::INDEX_FAISS_IVFFLAT);
|
||||
EXPECT_EQ(segment->HasFieldData(vec_field_id),
|
||||
index_type != knowhere::IndexEnum::INDEX_FAISS_IVFFLAT);
|
||||
auto ivf_sr = segment->Search(plan.get(), ph_group.get(), 1L << 63, 0);
|
||||
auto similary = GetKnnSearchRecall(num_queries,
|
||||
binlog_index_sr->seg_offsets_.data(),
|
||||
@ -385,7 +386,8 @@ TEST_P(BinlogIndexTest, AccuracyWithMapFieldData) {
|
||||
ASSERT_NO_THROW(segment->LoadIndex(load_info));
|
||||
EXPECT_TRUE(segment->HasIndex(vec_field_id));
|
||||
EXPECT_EQ(segment->get_row_count(), data_n);
|
||||
EXPECT_EQ(segment->HasFieldData(vec_field_id), index_type != knowhere::IndexEnum::INDEX_FAISS_IVFFLAT);
|
||||
EXPECT_EQ(segment->HasFieldData(vec_field_id),
|
||||
index_type != knowhere::IndexEnum::INDEX_FAISS_IVFFLAT);
|
||||
auto ivf_sr = segment->Search(plan.get(), ph_group.get(), 1L << 63);
|
||||
auto similary = GetKnnSearchRecall(num_queries,
|
||||
binlog_index_sr->seg_offsets_.data(),
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include <unordered_set>
|
||||
#include <memory>
|
||||
|
||||
#include "common/Consts.h"
|
||||
#include "common/Tracer.h"
|
||||
#include "common/Types.h"
|
||||
#include "index/BitmapIndex.h"
|
||||
@ -159,8 +160,9 @@ class BitmapIndexTest : public testing::Test {
|
||||
Config config;
|
||||
config["index_type"] = milvus::index::BITMAP_INDEX_TYPE;
|
||||
config[INSERT_FILES_KEY] = std::vector<std::string>{log_path};
|
||||
config[INDEX_NUM_ROWS_KEY] = nb_;
|
||||
if (has_lack_binlog_row_) {
|
||||
config["lack_binlog_rows"] = lack_binlog_row_;
|
||||
config[INDEX_NUM_ROWS_KEY] = nb_ + lack_binlog_row_;
|
||||
}
|
||||
|
||||
auto build_index =
|
||||
|
||||
@ -160,8 +160,9 @@ class HybridIndexTestV1 : public testing::Test {
|
||||
config["index_type"] = milvus::index::HYBRID_INDEX_TYPE;
|
||||
config[INSERT_FILES_KEY] = std::vector<std::string>{log_path};
|
||||
config["bitmap_cardinality_limit"] = "1000";
|
||||
config[INDEX_NUM_ROWS_KEY] = nb_;
|
||||
if (has_lack_binlog_row_) {
|
||||
config["lack_binlog_rows"] = lack_binlog_row_;
|
||||
config[INDEX_NUM_ROWS_KEY] = nb_ + lack_binlog_row_;
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@ -167,8 +167,9 @@ test_run() {
|
||||
Config config;
|
||||
config["index_type"] = milvus::index::INVERTED_INDEX_TYPE;
|
||||
config[INSERT_FILES_KEY] = std::vector<std::string>{log_path};
|
||||
config[INDEX_NUM_ROWS_KEY] = nb;
|
||||
if (has_lack_binlog_row_) {
|
||||
config["lack_binlog_rows"] = lack_binlog_row;
|
||||
config[INDEX_NUM_ROWS_KEY] = nb + lack_binlog_row;
|
||||
}
|
||||
|
||||
auto index = indexbuilder::IndexFactory::GetInstance().CreateIndex(
|
||||
@ -558,8 +559,9 @@ test_string() {
|
||||
Config config;
|
||||
config["index_type"] = milvus::index::INVERTED_INDEX_TYPE;
|
||||
config[INSERT_FILES_KEY] = std::vector<std::string>{log_path};
|
||||
config[INDEX_NUM_ROWS_KEY] = nb;
|
||||
if (has_lack_binlog_row_) {
|
||||
config["lack_binlog_rows"] = lack_binlog_row;
|
||||
config[INDEX_NUM_ROWS_KEY] = nb + lack_binlog_row;
|
||||
}
|
||||
|
||||
auto index = indexbuilder::IndexFactory::GetInstance().CreateIndex(
|
||||
|
||||
@ -249,7 +249,8 @@ class SealedSegmentRegexQueryTest : public ::testing::Test {
|
||||
}
|
||||
auto index = index::CreateStringIndexSort();
|
||||
std::vector<uint8_t> buffer(arr.ByteSizeLong());
|
||||
ASSERT_TRUE(arr.SerializeToArray(buffer.data(), arr.ByteSizeLong()));
|
||||
ASSERT_TRUE(
|
||||
arr.SerializeToArray(buffer.data(), arr.ByteSizeLong()));
|
||||
index->BuildWithRawDataForUT(arr.ByteSizeLong(), buffer.data());
|
||||
LoadIndexInfo info{
|
||||
.field_id = schema->get_field_id(FieldName("str")).get(),
|
||||
|
||||
@ -1713,7 +1713,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context,
|
||||
}
|
||||
|
||||
// TODO add field info sync between segcore and go segment for storage v2
|
||||
if loadInfo.GetStorageVersion() != 2 {
|
||||
if loadInfo.GetStorageVersion() != storage.StorageV2 {
|
||||
fieldInfo, ok := fieldInfos[info.GetFieldID()]
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("index info with corresponding field info", "missing field info", strconv.FormatInt(fieldInfo.GetFieldID(), 10))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user