mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: [AddField] Add shared_lock for insert prevent race (#43229)
Related to #43113 When schema change happens, insert shall not happen, otherwise: - Data race may happen causing insertion failure - Inconsistent data schema This PR add shared_lock prevent this data race. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
a90694165b
commit
6bbed3b019
@ -357,10 +357,11 @@ ChunkedSegmentSealedImpl::load_field_data_internal(
|
||||
|
||||
auto field_id = FieldId(id);
|
||||
|
||||
auto field_data_info = FieldDataInfo(field_id.get(),
|
||||
num_rows,
|
||||
load_info.mmap_dir_path,
|
||||
schema_->ShouldLoadField(field_id));
|
||||
auto field_data_info =
|
||||
FieldDataInfo(field_id.get(),
|
||||
num_rows,
|
||||
load_info.mmap_dir_path,
|
||||
schema_->ShouldLoadField(field_id));
|
||||
LOG_INFO("segment {} loads field {} with num_rows {}, sorted by pk {}",
|
||||
this->get_segment_id(),
|
||||
field_id.get(),
|
||||
|
||||
@ -90,6 +90,11 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
||||
InsertRecordProto* insert_record_proto) {
|
||||
AssertInfo(insert_record_proto->num_rows() == num_rows,
|
||||
"Entities_raw count not equal to insert size");
|
||||
// protect schema being changed during insert
|
||||
// schema change cannot happends during insertion,
|
||||
// otherwise, there might be some data not following new schema
|
||||
std::shared_lock lck(sch_mutex_);
|
||||
|
||||
// step 1: check insert data if valid
|
||||
std::unordered_map<FieldId, int64_t> field_id_to_offset;
|
||||
int64_t field_offset = 0;
|
||||
@ -1263,7 +1268,7 @@ SegmentGrowingImpl::LazyCheckSchema(SchemaPtr sch) {
|
||||
|
||||
void
|
||||
SegmentGrowingImpl::Reopen(SchemaPtr sch) {
|
||||
std::unique_lock lck(mutex_);
|
||||
std::unique_lock lck(sch_mutex_);
|
||||
|
||||
auto absent_fields = sch->AbsentFields(*schema_);
|
||||
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
@ -546,6 +547,9 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
search_pk(const PkType& pk, Timestamp timestamp) const = 0;
|
||||
|
||||
protected:
|
||||
// mutex protecting rw options on schema_
|
||||
std::shared_mutex sch_mutex_;
|
||||
|
||||
mutable std::shared_mutex mutex_;
|
||||
// fieldID -> std::pair<num_rows, avg_size>
|
||||
std::unordered_map<FieldId, std::pair<int64_t, int64_t>>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user