mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: [AddField] Add protection logic inserting old data into new schema (#41978)
Related to #39718 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
dad43a3894
commit
f021b3f26a
@ -1465,10 +1465,10 @@ ChunkedSegmentSealedImpl::HasRawData(int64_t field_id) const {
|
||||
get_bit(binlog_index_bitset_, fieldID)) {
|
||||
AssertInfo(vector_indexings_.is_ready(fieldID),
|
||||
"vector index is not ready");
|
||||
AssertInfo(index_has_raw_data_.find(fieldID) !=
|
||||
index_has_raw_data_.end(),
|
||||
"index_has_raw_data_ is not set for fieldID: " +
|
||||
std::to_string(fieldID.get()));
|
||||
AssertInfo(
|
||||
index_has_raw_data_.find(fieldID) != index_has_raw_data_.end(),
|
||||
"index_has_raw_data_ is not set for fieldID: " +
|
||||
std::to_string(fieldID.get()));
|
||||
return index_has_raw_data_.at(fieldID);
|
||||
}
|
||||
} else if (IsJsonDataType(field_meta.get_data_type())) {
|
||||
@ -1476,10 +1476,10 @@ ChunkedSegmentSealedImpl::HasRawData(int64_t field_id) const {
|
||||
} else {
|
||||
auto scalar_index = scalar_indexings_.find(fieldID);
|
||||
if (scalar_index != scalar_indexings_.end()) {
|
||||
AssertInfo(index_has_raw_data_.find(fieldID) !=
|
||||
index_has_raw_data_.end(),
|
||||
"index_has_raw_data_ is not set for fieldID: " +
|
||||
std::to_string(fieldID.get()));
|
||||
AssertInfo(
|
||||
index_has_raw_data_.find(fieldID) != index_has_raw_data_.end(),
|
||||
"index_has_raw_data_ is not set for fieldID: " +
|
||||
std::to_string(fieldID.get()));
|
||||
return index_has_raw_data_.at(fieldID);
|
||||
}
|
||||
}
|
||||
|
||||
@ -32,7 +32,7 @@ class SegmentGrowing : public SegmentInternalInterface {
|
||||
int64_t size,
|
||||
const int64_t* row_ids,
|
||||
const Timestamp* timestamps,
|
||||
const InsertRecordProto* insert_record_proto) = 0;
|
||||
InsertRecordProto* insert_record_proto) = 0;
|
||||
|
||||
SegmentType
|
||||
type() const override {
|
||||
|
||||
@ -88,7 +88,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
||||
int64_t num_rows,
|
||||
const int64_t* row_ids,
|
||||
const Timestamp* timestamps_raw,
|
||||
const InsertRecordProto* insert_record_proto) {
|
||||
InsertRecordProto* insert_record_proto) {
|
||||
AssertInfo(insert_record_proto->num_rows() == num_rows,
|
||||
"Entities_raw count not equal to insert size");
|
||||
// step 1: check insert data if valid
|
||||
@ -116,6 +116,20 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
||||
}
|
||||
}
|
||||
|
||||
// segment have latest schema while insert used old one
|
||||
// need to fill insert data with field_meta
|
||||
for (auto& [field_id, field_meta] : schema_->get_fields()) {
|
||||
if (field_id.get() < START_USER_FIELDID) {
|
||||
continue;
|
||||
}
|
||||
if (field_id_to_offset.count(field_id) > 0) {
|
||||
continue;
|
||||
}
|
||||
auto data = bulk_subscript_not_exist_field(field_meta, num_rows);
|
||||
insert_record_proto->add_fields_data()->CopyFrom(*data);
|
||||
field_id_to_offset.emplace(field_id, field_offset++);
|
||||
}
|
||||
|
||||
// step 2: sort timestamp
|
||||
// query node already guarantees that the timestamp is ordered, avoid field data copy in c++
|
||||
|
||||
@ -125,7 +139,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
||||
|
||||
// update the mem size of timestamps and row IDs
|
||||
stats_.mem_size += num_rows * (sizeof(Timestamp) + sizeof(idx_t));
|
||||
for (auto [field_id, field_meta] : schema_->get_fields()) {
|
||||
for (auto& [field_id, field_meta] : schema_->get_fields()) {
|
||||
if (field_id.get() < START_USER_FIELDID) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -48,7 +48,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
int64_t size,
|
||||
const int64_t* row_ids,
|
||||
const Timestamp* timestamps,
|
||||
const InsertRecordProto* insert_record_proto) override;
|
||||
InsertRecordProto* insert_record_proto) override;
|
||||
|
||||
bool
|
||||
Contain(const PkType& pk) const override {
|
||||
|
||||
@ -71,7 +71,8 @@ DefaultValueChunkTranslator::get_cells(
|
||||
milvus::storage::CreateArrowBuilder(field_meta_.get_data_type());
|
||||
arrow::Status ast;
|
||||
if (field_meta_.default_value().has_value()) {
|
||||
builder->Reserve(num_rows);
|
||||
ast = builder->Reserve(num_rows);
|
||||
AssertInfo(ast.ok(), "reserve arrow build failed: {}", ast.ToString());
|
||||
auto scalar = storage::CreateArrowScalarFromDefaultValue(field_meta_);
|
||||
ast = builder->AppendScalar(*scalar, num_rows);
|
||||
} else {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user