mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: [AddField] Add log for segcore segment schema change (#43215)
Related to #39178 This PR add logs for segment schema change operations. Also fixes the nit comments from PR #42490 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
7f8c5c9bb8
commit
f027eea545
@ -302,7 +302,7 @@ class Schema {
|
||||
}
|
||||
|
||||
bool
|
||||
ShallLoadField(FieldId field_id) {
|
||||
ShouldLoadField(FieldId field_id) {
|
||||
return load_fields_.empty() || load_fields_.count(field_id) > 0;
|
||||
}
|
||||
|
||||
|
||||
@ -32,11 +32,11 @@ struct FieldDataInfo {
|
||||
FieldDataInfo(int64_t field_id,
|
||||
size_t row_count,
|
||||
std::string mmap_dir_path = "",
|
||||
bool in_list = false)
|
||||
bool in_load_list = false)
|
||||
: field_id(field_id),
|
||||
row_count(row_count),
|
||||
mmap_dir_path(std::move(mmap_dir_path)),
|
||||
in_load_list(in_list) {
|
||||
in_load_list(in_load_list) {
|
||||
arrow_reader_channel = std::make_shared<ArrowReaderChannel>();
|
||||
}
|
||||
|
||||
|
||||
@ -293,7 +293,7 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
|
||||
for (int i = 0; i < field_id_list.size(); ++i) {
|
||||
milvus_field_ids.push_back(FieldId(field_id_list.Get(i)));
|
||||
merged_in_load_list = merged_in_load_list ||
|
||||
schema_->ShallLoadField(milvus_field_ids[i]);
|
||||
schema_->ShouldLoadField(milvus_field_ids[i]);
|
||||
}
|
||||
|
||||
auto column_group_info = FieldDataInfo(column_group_id.get(),
|
||||
@ -360,7 +360,7 @@ ChunkedSegmentSealedImpl::load_field_data_internal(
|
||||
auto field_data_info = FieldDataInfo(field_id.get(),
|
||||
num_rows,
|
||||
load_info.mmap_dir_path,
|
||||
schema_->ShallLoadField(field_id));
|
||||
schema_->ShouldLoadField(field_id));
|
||||
LOG_INFO("segment {} loads field {} with num_rows {}, sorted by pk {}",
|
||||
this->get_segment_id(),
|
||||
field_id.get(),
|
||||
@ -1892,6 +1892,12 @@ ChunkedSegmentSealedImpl::RemoveFieldFile(const FieldId field_id) {
|
||||
void
|
||||
ChunkedSegmentSealedImpl::LazyCheckSchema(SchemaPtr sch) {
|
||||
if (sch->get_schema_version() > schema_->get_schema_version()) {
|
||||
LOG_INFO(
|
||||
"lazy check schema segment {} found newer schema version, current "
|
||||
"schema version {}, new schema version {}",
|
||||
id_,
|
||||
schema_->get_schema_version(),
|
||||
sch->get_schema_version());
|
||||
Reopen(sch);
|
||||
}
|
||||
}
|
||||
@ -1917,8 +1923,9 @@ ChunkedSegmentSealedImpl::load_field_data_common(
|
||||
}
|
||||
|
||||
if (!enable_mmap) {
|
||||
if (!is_proxy_column || is_proxy_column &&
|
||||
field_id.get() != DEFAULT_SHORT_COLUMN_GROUP_ID) {
|
||||
if (!is_proxy_column ||
|
||||
is_proxy_column &&
|
||||
field_id.get() != DEFAULT_SHORT_COLUMN_GROUP_ID) {
|
||||
stats_.mem_size += column->DataByteSize();
|
||||
}
|
||||
if (!IsVariableDataType(data_type) || IsStringDataType(data_type)) {
|
||||
@ -2017,9 +2024,14 @@ ChunkedSegmentSealedImpl::FinishLoad() {
|
||||
|
||||
void
|
||||
ChunkedSegmentSealedImpl::fill_empty_field(const FieldMeta& field_meta) {
|
||||
auto field_id = field_meta.get_id();
|
||||
LOG_INFO("start fill empty field {} (data type {}) for sealed segment {}",
|
||||
field_meta.get_data_type(),
|
||||
field_id.get(),
|
||||
id_);
|
||||
int64_t size = num_rows_.value();
|
||||
AssertInfo(size > 0, "Chunked Sealed segment must have more than 0 row");
|
||||
auto field_data_info = FieldDataInfo(field_meta.get_id().get(), size, "");
|
||||
auto field_data_info = FieldDataInfo(field_id.get(), size, "");
|
||||
std::unique_ptr<Translator<milvus::Chunk>> translator =
|
||||
std::make_unique<storagev1translator::DefaultValueChunkTranslator>(
|
||||
get_segment_id(), field_meta, field_data_info, false);
|
||||
@ -2053,9 +2065,13 @@ ChunkedSegmentSealedImpl::fill_empty_field(const FieldMeta& field_meta) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
auto field_id = field_meta.get_id();
|
||||
|
||||
fields_.emplace(field_id, column);
|
||||
set_bit(field_data_ready_bitset_, field_id, true);
|
||||
LOG_INFO("fill empty field {} (data type {}) for growing segment {} done",
|
||||
field_meta.get_data_type(),
|
||||
field_id.get(),
|
||||
id_);
|
||||
}
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -479,10 +479,6 @@ CreateSealedSegment(
|
||||
const SegcoreConfig& segcore_config = SegcoreConfig::default_config(),
|
||||
bool is_sorted_by_pk = false) {
|
||||
return std::make_unique<ChunkedSegmentSealedImpl>(
|
||||
schema,
|
||||
index_meta,
|
||||
segcore_config,
|
||||
segment_id,
|
||||
is_sorted_by_pk);
|
||||
schema, index_meta, segcore_config, segment_id, is_sorted_by_pk);
|
||||
}
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -101,6 +101,12 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
||||
field_id_to_offset.emplace(field_id, field_offset++);
|
||||
// may be added field, add the null if has existed data
|
||||
if (exist_rows > 0 && !insert_record_.is_data_exist(field_id)) {
|
||||
LOG_WARN(
|
||||
"heterogeneous insert data found for segment {}, field id {}, "
|
||||
"data type {}",
|
||||
id_,
|
||||
field_id.get(),
|
||||
field.type());
|
||||
schema_->AddField(FieldName(field.field_name()),
|
||||
field_id,
|
||||
DataType(field.type()),
|
||||
@ -124,6 +130,13 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
||||
if (field_id_to_offset.count(field_id) > 0) {
|
||||
continue;
|
||||
}
|
||||
LOG_INFO(
|
||||
"schema newer than insert data found for segment {}, attach empty "
|
||||
"field data"
|
||||
"not exist field {}, data type {}",
|
||||
id_,
|
||||
field_id.get(),
|
||||
field_meta.get_data_type());
|
||||
auto data = bulk_subscript_not_exist_field(field_meta, num_rows);
|
||||
insert_record_proto->add_fields_data()->CopyFrom(*data);
|
||||
field_id_to_offset.emplace(field_id, field_offset++);
|
||||
@ -500,8 +513,10 @@ SegmentGrowingImpl::load_column_group_data_internal(
|
||||
auto field_data = storage::CreateFieldData(
|
||||
data_type,
|
||||
field.second.is_nullable(),
|
||||
IsVectorDataType(data_type) && !IsSparseFloatVectorDataType(data_type) ? field.second.get_dim()
|
||||
: 1,
|
||||
IsVectorDataType(data_type) &&
|
||||
!IsSparseFloatVectorDataType(data_type)
|
||||
? field.second.get_dim()
|
||||
: 1,
|
||||
batch_num_rows);
|
||||
field_data->FillFieldData(table->column(i));
|
||||
field_data_map[FieldId(field_id)].push_back(field_data);
|
||||
@ -1236,6 +1251,12 @@ SegmentGrowingImpl::BulkGetJsonData(
|
||||
void
|
||||
SegmentGrowingImpl::LazyCheckSchema(SchemaPtr sch) {
|
||||
if (sch->get_schema_version() > schema_->get_schema_version()) {
|
||||
LOG_INFO(
|
||||
"lazy check schema segment {} found newer schema version, current "
|
||||
"schema version {}, new schema version {}",
|
||||
id_,
|
||||
schema_->get_schema_version(),
|
||||
sch->get_schema_version());
|
||||
Reopen(sch);
|
||||
}
|
||||
}
|
||||
@ -1271,6 +1292,10 @@ SegmentGrowingImpl::FinishLoad() {
|
||||
void
|
||||
SegmentGrowingImpl::fill_empty_field(const FieldMeta& field_meta) {
|
||||
auto field_id = field_meta.get_id();
|
||||
LOG_INFO("start fill empty field {} (data type {}) for growing segment {}",
|
||||
field_meta.get_data_type(),
|
||||
field_id.get(),
|
||||
id_);
|
||||
// append meta only needed when schema is old
|
||||
// loading old segment with new schema will have meta appended
|
||||
if (!insert_record_.is_data_exist(field_id)) {
|
||||
@ -1286,9 +1311,10 @@ SegmentGrowingImpl::fill_empty_field(const FieldMeta& field_meta) {
|
||||
insert_record_.get_data_base(field_id)->set_data_raw(
|
||||
0, total_row_num, data.get(), field_meta);
|
||||
|
||||
LOG_INFO("Growing segment {} fill empty field {} done",
|
||||
this->get_segment_id(),
|
||||
field_meta.get_id().get());
|
||||
LOG_INFO("fill empty field {} (data type {}) for growing segment {} done",
|
||||
field_meta.get_data_type(),
|
||||
field_id.get(),
|
||||
id_);
|
||||
}
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -97,6 +97,11 @@ func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.Collec
|
||||
collection.schema.Store(schema)
|
||||
collection.ccollection.UpdateSchema(schema, loadMeta.GetSchemaVersion())
|
||||
collection.schemaVersion = loadMeta.GetSchemaVersion()
|
||||
log.Info("update collection schema",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Uint64("schemaVersion", loadMeta.GetSchemaVersion()),
|
||||
zap.Any("schema", schema),
|
||||
)
|
||||
}
|
||||
collection.Ref(1)
|
||||
return nil
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user