enhance: [AddField] Trigger check schema in retrieve as well (#41598)

Related to #39718
Fixes milvus-io/pymilvus#2771

This PR:
- Make AsyncRetrieve task triggers "schema check" logic as well
- Rename `AddField` related methods to align with code standard

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-04-29 14:10:49 +08:00 committed by GitHub
parent 910f68c986
commit f3f8227cd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 24 additions and 22 deletions

View File

@ -1650,14 +1650,14 @@ ChunkedSegmentSealedImpl::RemoveFieldFile(const FieldId field_id) {
}
void
ChunkedSegmentSealedImpl::lazy_check_schema(const query::Plan* plan) {
if (plan->schema_.get_schema_version() > schema_->get_schema_version()) {
reopen(std::make_shared<Schema>(plan->schema_));
ChunkedSegmentSealedImpl::LazyCheckSchema(const Schema& sch) {
if (sch.get_schema_version() > schema_->get_schema_version()) {
Reopen(std::make_shared<Schema>(sch));
}
}
void
ChunkedSegmentSealedImpl::reopen(SchemaPtr sch) {
ChunkedSegmentSealedImpl::Reopen(SchemaPtr sch) {
std::unique_lock lck(mutex_);
field_data_ready_bitset_.resize(sch->size());
@ -1678,7 +1678,7 @@ ChunkedSegmentSealedImpl::reopen(SchemaPtr sch) {
}
void
ChunkedSegmentSealedImpl::finish_load() {
ChunkedSegmentSealedImpl::FinishLoad() {
std::unique_lock lck(mutex_);
for (const auto& [field_id, field_meta] : schema_->get_fields()) {
if (field_id.get() < START_USER_FIELDID) {

View File

@ -133,13 +133,13 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
}
void
reopen(SchemaPtr sch) override;
Reopen(SchemaPtr sch) override;
void
lazy_check_schema(const query::Plan* plan) override;
LazyCheckSchema(const Schema& sch) override;
void
finish_load() override;
FinishLoad() override;
public:
size_t

View File

@ -1171,14 +1171,14 @@ SegmentGrowingImpl::GetJsonData(FieldId field_id, size_t offset) const {
}
void
SegmentGrowingImpl::lazy_check_schema(const query::Plan* plan) {
if (plan->schema_.get_schema_version() > schema_->get_schema_version()) {
reopen(std::make_shared<Schema>(plan->schema_));
SegmentGrowingImpl::LazyCheckSchema(const Schema& sch) {
if (sch.get_schema_version() > schema_->get_schema_version()) {
Reopen(std::make_shared<Schema>(sch));
}
}
void
SegmentGrowingImpl::reopen(SchemaPtr sch) {
SegmentGrowingImpl::Reopen(SchemaPtr sch) {
std::unique_lock lck(mutex_);
auto absent_fields = sch->absent_fields(*schema_);
@ -1191,7 +1191,7 @@ SegmentGrowingImpl::reopen(SchemaPtr sch) {
}
void
SegmentGrowingImpl::finish_load() {
SegmentGrowingImpl::FinishLoad() {
for (const auto& [field_id, field_meta] : schema_->get_fields()) {
if (field_id.get() < START_USER_FIELDID) {
continue;

View File

@ -100,13 +100,13 @@ class SegmentGrowingImpl : public SegmentGrowing {
size_t num_rows);
void
reopen(SchemaPtr sch) override;
Reopen(SchemaPtr sch) override;
void
lazy_check_schema(const query::Plan* plan) override;
LazyCheckSchema(const Schema& sch) override;
void
finish_load() override;
FinishLoad() override;
public:
const InsertRecord<false>&

View File

@ -146,16 +146,16 @@ class SegmentInterface {
GetJsonData(FieldId field_id, size_t offset) const = 0;
virtual void
lazy_check_schema(const query::Plan* plan) = 0;
LazyCheckSchema(const Schema& sch) = 0;
// reopen segment with new schema
virtual void
reopen(SchemaPtr sch) = 0;
Reopen(SchemaPtr sch) = 0;
// finish_load notifies the segment that all load operation are done
// FinishLoad notifies the segment that all load operation are done
// currently it's used to sync field data list with updated schema.
virtual void
finish_load() = 0;
FinishLoad() = 0;
};
// internal API for DSL calculation

View File

@ -124,7 +124,7 @@ AsyncSearch(CTraceContext c_trace,
auto span = milvus::tracer::StartSpan("SegCoreSearch", &trace_ctx);
milvus::tracer::SetRootSpan(span);
segment->lazy_check_schema(plan);
segment->LazyCheckSchema(plan->schema_);
auto search_result =
segment->Search(plan, phg_ptr, timestamp, consistency_level);
@ -194,6 +194,8 @@ AsyncRetrieve(CTraceContext c_trace,
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true);
segment->LazyCheckSchema(plan->schema_);
auto retrieve_result = segment->Retrieve(&trace_ctx,
plan,
timestamp,
@ -590,7 +592,7 @@ FinishLoad(CSegmentInterface c_segment) {
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
segment_interface->finish_load();
segment_interface->FinishLoad();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(milvus::UnexpectedError, e.what());