mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: [AddField] Resolve FieldIndexing dangling reference (#43499)
Related to #43113 This PR: - Change member of FieldIndex from `FieldMeta &` to needed `DataType` and dim member resolving dangling reference after schema change - Add double check after acquiring lock to reduce multiple assignment - Change `auto schema` to `auto& schema` to reduce schema copy Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
59bbdd93f5
commit
cc1034fe96
@ -13,6 +13,7 @@
|
||||
#include <thread>
|
||||
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/Types.h"
|
||||
#include "fmt/format.h"
|
||||
#include "index/ScalarIndexSort.h"
|
||||
|
||||
@ -113,7 +114,7 @@ VectorFieldIndexing::GetDataFromIndex(const int64_t* seg_offsets,
|
||||
ids_ds->SetDim(1);
|
||||
ids_ds->SetIds(seg_offsets);
|
||||
ids_ds->SetIsOwner(false);
|
||||
if (field_meta_.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) {
|
||||
if (IsSparseFloatVectorDataType(get_data_type())) {
|
||||
auto vector = index_->GetSparseVector(ids_ds);
|
||||
SparseRowsToProto(
|
||||
[vec_ptr = vector.get()](size_t i) { return vec_ptr + i; },
|
||||
@ -131,7 +132,7 @@ VectorFieldIndexing::AppendSegmentIndexSparse(int64_t reserved_offset,
|
||||
int64_t new_data_dim,
|
||||
const VectorBase* field_raw_data,
|
||||
const void* data_source) {
|
||||
auto conf = get_build_params(field_meta_.get_data_type());
|
||||
auto conf = get_build_params(get_data_type());
|
||||
auto source = dynamic_cast<const ConcurrentVector<SparseFloatVector>*>(
|
||||
field_raw_data);
|
||||
AssertInfo(source,
|
||||
@ -158,7 +159,7 @@ VectorFieldIndexing::AppendSegmentIndexSparse(int64_t reserved_offset,
|
||||
}
|
||||
} catch (SegcoreError& error) {
|
||||
LOG_ERROR("growing sparse index build error: {}", error.what());
|
||||
recreate_index(field_meta_.get_data_type(), nullptr);
|
||||
recreate_index(get_data_type(), nullptr);
|
||||
index_cur_ = 0;
|
||||
return;
|
||||
}
|
||||
@ -185,23 +186,22 @@ VectorFieldIndexing::AppendSegmentIndexDense(int64_t reserved_offset,
|
||||
int64_t size,
|
||||
const VectorBase* field_raw_data,
|
||||
const void* data_source) {
|
||||
AssertInfo(field_meta_.get_data_type() == DataType::VECTOR_FLOAT ||
|
||||
field_meta_.get_data_type() == DataType::VECTOR_FLOAT16 ||
|
||||
field_meta_.get_data_type() == DataType::VECTOR_BFLOAT16,
|
||||
AssertInfo(get_data_type() == DataType::VECTOR_FLOAT ||
|
||||
get_data_type() == DataType::VECTOR_FLOAT16 ||
|
||||
get_data_type() == DataType::VECTOR_BFLOAT16,
|
||||
"Data type of vector field is not in (VECTOR_FLOAT, "
|
||||
"VECTOR_FLOAT16,VECTOR_BFLOAT16)");
|
||||
auto dim = field_meta_.get_dim();
|
||||
auto conf = get_build_params(field_meta_.get_data_type());
|
||||
auto dim = get_dim();
|
||||
auto conf = get_build_params(get_data_type());
|
||||
auto size_per_chunk = field_raw_data->get_size_per_chunk();
|
||||
//append vector [vector_id_beg, vector_id_end] into index
|
||||
//build index [vector_id_beg, build_threshold) when index not exist
|
||||
AssertInfo(
|
||||
ConcurrentDenseVectorCheck(field_raw_data, field_meta_.get_data_type()),
|
||||
"vec_base can't cast to ConcurrentVector type");
|
||||
AssertInfo(ConcurrentDenseVectorCheck(field_raw_data, get_data_type()),
|
||||
"vec_base can't cast to ConcurrentVector type");
|
||||
size_t vec_length;
|
||||
if (field_meta_.get_data_type() == DataType::VECTOR_FLOAT) {
|
||||
if (get_data_type() == DataType::VECTOR_FLOAT) {
|
||||
vec_length = dim * sizeof(float);
|
||||
} else if (field_meta_.get_data_type() == DataType::VECTOR_FLOAT16) {
|
||||
} else if (get_data_type() == DataType::VECTOR_FLOAT16) {
|
||||
vec_length = dim * sizeof(float16);
|
||||
} else {
|
||||
vec_length = dim * sizeof(bfloat16);
|
||||
@ -248,7 +248,7 @@ VectorFieldIndexing::AppendSegmentIndexDense(int64_t reserved_offset,
|
||||
index_->BuildWithDataset(dataset, conf);
|
||||
} catch (SegcoreError& error) {
|
||||
LOG_ERROR("growing index build error: {}", error.what());
|
||||
recreate_index(field_meta_.get_data_type(), field_raw_data);
|
||||
recreate_index(get_data_type(), field_raw_data);
|
||||
return;
|
||||
}
|
||||
index_cur_.fetch_add(vec_num);
|
||||
@ -296,8 +296,8 @@ VectorFieldIndexing::AppendSegmentIndexDense(int64_t reserved_offset,
|
||||
knowhere::Json
|
||||
VectorFieldIndexing::get_build_params(DataType data_type) const {
|
||||
auto config = config_->GetBuildBaseParams(data_type);
|
||||
if (!IsSparseFloatVectorDataType(field_meta_.get_data_type())) {
|
||||
config[knowhere::meta::DIM] = std::to_string(field_meta_.get_dim());
|
||||
if (!IsSparseFloatVectorDataType(get_data_type())) {
|
||||
config[knowhere::meta::DIM] = std::to_string(get_dim());
|
||||
}
|
||||
config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1);
|
||||
// for sparse float vector: drop_ratio_build config is not allowed to be set
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
@ -26,6 +27,7 @@
|
||||
#include "common/Schema.h"
|
||||
#include "common/IndexMeta.h"
|
||||
#include "IndexConfigGenerator.h"
|
||||
#include "common/Types.h"
|
||||
#include "knowhere/config.h"
|
||||
#include "log/Log.h"
|
||||
#include "segcore/SegcoreConfig.h"
|
||||
@ -40,7 +42,12 @@ class FieldIndexing {
|
||||
public:
|
||||
explicit FieldIndexing(const FieldMeta& field_meta,
|
||||
const SegcoreConfig& segcore_config)
|
||||
: field_meta_(field_meta), segcore_config_(segcore_config) {
|
||||
: data_type_(field_meta.get_data_type()),
|
||||
dim_(IsVectorDataType(field_meta.get_data_type()) &&
|
||||
!IsSparseFloatVectorDataType(field_meta.get_data_type())
|
||||
? field_meta.get_dim()
|
||||
: 1),
|
||||
segcore_config_(segcore_config) {
|
||||
}
|
||||
FieldIndexing(const FieldIndexing&) = delete;
|
||||
FieldIndexing&
|
||||
@ -78,9 +85,14 @@ class FieldIndexing {
|
||||
return true;
|
||||
}
|
||||
|
||||
const FieldMeta&
|
||||
get_field_meta() {
|
||||
return field_meta_;
|
||||
DataType
|
||||
get_data_type() const {
|
||||
return data_type_;
|
||||
}
|
||||
|
||||
int64_t
|
||||
get_dim() const {
|
||||
return dim_;
|
||||
}
|
||||
|
||||
int64_t
|
||||
@ -96,7 +108,8 @@ class FieldIndexing {
|
||||
|
||||
protected:
|
||||
// additional info
|
||||
const FieldMeta& field_meta_;
|
||||
const DataType data_type_;
|
||||
const int64_t dim_;
|
||||
const SegcoreConfig& segcore_config_;
|
||||
};
|
||||
|
||||
@ -146,7 +159,7 @@ class ScalarFieldIndexing : public FieldIndexing {
|
||||
// concurrent
|
||||
PinWrapper<index::IndexBase*>
|
||||
get_chunk_indexing(int64_t chunk_id) const override {
|
||||
Assert(!field_meta_.is_vector());
|
||||
Assert(!IsVectorDataType(data_type_));
|
||||
return data_.at(chunk_id).get();
|
||||
}
|
||||
|
||||
@ -199,7 +212,7 @@ class VectorFieldIndexing : public FieldIndexing {
|
||||
// concurrent
|
||||
PinWrapper<index::IndexBase*>
|
||||
get_chunk_indexing(int64_t chunk_id) const override {
|
||||
Assert(field_meta_.is_vector());
|
||||
Assert(IsVectorDataType(data_type_));
|
||||
return PinWrapper<index::IndexBase*>(data_.at(chunk_id).get());
|
||||
}
|
||||
|
||||
@ -308,7 +321,7 @@ class IndexingRecord {
|
||||
return;
|
||||
}
|
||||
auto& indexing = field_indexings_.at(fieldId);
|
||||
auto type = indexing->get_field_meta().get_data_type();
|
||||
auto type = indexing->get_data_type();
|
||||
auto field_raw_data = record.get_data_base(fieldId);
|
||||
if (type == DataType::VECTOR_FLOAT &&
|
||||
reserved_offset + size >= indexing->get_build_threshold()) {
|
||||
@ -354,7 +367,7 @@ class IndexingRecord {
|
||||
return;
|
||||
}
|
||||
auto& indexing = field_indexings_.at(fieldId);
|
||||
auto type = indexing->get_field_meta().get_data_type();
|
||||
auto type = indexing->get_data_type();
|
||||
const void* p = data->Data();
|
||||
|
||||
if ((type == DataType::VECTOR_FLOAT ||
|
||||
@ -388,14 +401,11 @@ class IndexingRecord {
|
||||
void* output_raw) const {
|
||||
if (is_in(fieldId)) {
|
||||
auto& indexing = field_indexings_.at(fieldId);
|
||||
if (indexing->get_field_meta().get_data_type() ==
|
||||
DataType::VECTOR_FLOAT ||
|
||||
indexing->get_field_meta().get_data_type() ==
|
||||
DataType::VECTOR_FLOAT16 ||
|
||||
indexing->get_field_meta().get_data_type() ==
|
||||
DataType::VECTOR_BFLOAT16 ||
|
||||
indexing->get_field_meta().get_data_type() ==
|
||||
DataType::VECTOR_SPARSE_FLOAT) {
|
||||
auto data_type = indexing->get_data_type();
|
||||
if (data_type == DataType::VECTOR_FLOAT ||
|
||||
data_type == DataType::VECTOR_FLOAT16 ||
|
||||
data_type == DataType::VECTOR_BFLOAT16 ||
|
||||
data_type == DataType::VECTOR_SPARSE_FLOAT) {
|
||||
indexing->GetDataFromIndex(
|
||||
seg_offsets, count, element_size, output_raw);
|
||||
}
|
||||
|
||||
@ -1270,13 +1270,16 @@ void
|
||||
SegmentGrowingImpl::Reopen(SchemaPtr sch) {
|
||||
std::unique_lock lck(sch_mutex_);
|
||||
|
||||
auto absent_fields = sch->AbsentFields(*schema_);
|
||||
// double check condition, avoid multiple assignment
|
||||
if (sch->get_schema_version() > schema_->get_schema_version()) {
|
||||
auto absent_fields = sch->AbsentFields(*schema_);
|
||||
|
||||
for (const auto& field_meta : *absent_fields) {
|
||||
fill_empty_field(field_meta);
|
||||
for (const auto& field_meta : *absent_fields) {
|
||||
fill_empty_field(field_meta);
|
||||
}
|
||||
|
||||
schema_ = sch;
|
||||
}
|
||||
|
||||
schema_ = sch;
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@ -325,7 +325,7 @@ SegmentInternalInterface::get_field_avg_size(FieldId field_id) const {
|
||||
ThrowInfo(FieldIDInvalid, "unsupported system field id");
|
||||
}
|
||||
|
||||
auto schema = get_schema();
|
||||
auto& schema = get_schema();
|
||||
auto& field_meta = schema[field_id];
|
||||
auto data_type = field_meta.get_data_type();
|
||||
|
||||
@ -348,7 +348,7 @@ SegmentInternalInterface::set_field_avg_size(FieldId field_id,
|
||||
int64_t field_size) {
|
||||
AssertInfo(field_id.get() >= 0,
|
||||
"invalid field id, should be greater than or equal to 0");
|
||||
auto schema = get_schema();
|
||||
auto& schema = get_schema();
|
||||
auto& field_meta = schema[field_id];
|
||||
auto data_type = field_meta.get_data_type();
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user